mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
control_plane/attachment_service: improve Scheduler (#6633)
## Problem One of the major shortcuts in the initial version of this code was to construct a fresh `Scheduler` each time we need it, which is an O(N^2) cost as the tenant count increases. ## Summary of changes - Keep `Scheduler` alive through the lifetime of ServiceState - Use `IntentState` as a reference tracking helper, updating Scheduler refcounts as nodes are added/removed from the intent. There is an automated test that checks things don't get pathologically slow with thousands of shards, but it's not included in this PR because tests that implicitly test the runner node performance take some thought to stabilize/land in CI.
This commit is contained in:
@@ -27,7 +27,7 @@ pub(super) struct Reconciler {
|
||||
pub(super) tenant_shard_id: TenantShardId,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) generation: Generation,
|
||||
pub(crate) intent: IntentState,
|
||||
pub(crate) intent: TargetState,
|
||||
pub(crate) config: TenantConfig,
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
@@ -62,6 +62,32 @@ pub(super) struct Reconciler {
|
||||
pub(crate) persistence: Arc<Persistence>,
|
||||
}
|
||||
|
||||
/// This is a snapshot of [`crate::tenant_state::IntentState`], but it does not do any
|
||||
/// reference counting for Scheduler. The IntentState is what the scheduler works with,
|
||||
/// and the TargetState is just the instruction for a particular Reconciler run.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TargetState {
|
||||
pub(crate) attached: Option<NodeId>,
|
||||
pub(crate) secondary: Vec<NodeId>,
|
||||
}
|
||||
|
||||
impl TargetState {
|
||||
pub(crate) fn from_intent(intent: &IntentState) -> Self {
|
||||
Self {
|
||||
attached: *intent.get_attached(),
|
||||
secondary: intent.get_secondary().clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn all_pageservers(&self) -> Vec<NodeId> {
|
||||
let mut result = self.secondary.clone();
|
||||
if let Some(node_id) = &self.attached {
|
||||
result.push(*node_id);
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum ReconcileError {
|
||||
#[error(transparent)]
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use crate::node::Node;
|
||||
use std::collections::HashMap;
|
||||
use utils::{http::error::ApiError, id::NodeId};
|
||||
|
||||
use crate::{node::Node, tenant_state::TenantState};
|
||||
|
||||
/// Scenarios in which we cannot find a suitable location for a tenant shard
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ScheduleError {
|
||||
@@ -19,52 +17,95 @@ impl From<ScheduleError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
struct SchedulerNode {
|
||||
/// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
|
||||
shard_count: usize,
|
||||
|
||||
/// Whether this node is currently elegible to have new shards scheduled (this is derived
|
||||
/// from a node's availability state and scheduling policy).
|
||||
may_schedule: bool,
|
||||
}
|
||||
|
||||
pub(crate) struct Scheduler {
|
||||
tenant_counts: HashMap<NodeId, usize>,
|
||||
nodes: HashMap<NodeId, SchedulerNode>,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn new(
|
||||
tenants: &BTreeMap<TenantShardId, TenantState>,
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
) -> Self {
|
||||
let mut tenant_counts = HashMap::new();
|
||||
for node_id in nodes.keys() {
|
||||
tenant_counts.insert(*node_id, 0);
|
||||
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
|
||||
let mut scheduler_nodes = HashMap::new();
|
||||
for node in nodes {
|
||||
scheduler_nodes.insert(
|
||||
node.id,
|
||||
SchedulerNode {
|
||||
shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
for tenant in tenants.values() {
|
||||
if let Some(ps) = tenant.intent.attached {
|
||||
let entry = tenant_counts.entry(ps).or_insert(0);
|
||||
*entry += 1;
|
||||
Self {
|
||||
nodes: scheduler_nodes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment the reference count of a node. This reference count is used to guide scheduling
|
||||
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
|
||||
/// this node.
|
||||
///
|
||||
/// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
|
||||
/// [`Self::new`] or [`Self::node_upsert`])
|
||||
pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
|
||||
let Some(node) = self.nodes.get_mut(&node_id) else {
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
debug_assert!(false);
|
||||
return;
|
||||
};
|
||||
|
||||
node.shard_count += 1;
|
||||
}
|
||||
|
||||
/// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
|
||||
pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
|
||||
let Some(node) = self.nodes.get_mut(&node_id) else {
|
||||
debug_assert!(false);
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
return;
|
||||
};
|
||||
|
||||
node.shard_count -= 1;
|
||||
}
|
||||
|
||||
pub(crate) fn node_upsert(&mut self, node: &Node) {
|
||||
use std::collections::hash_map::Entry::*;
|
||||
match self.nodes.entry(node.id) {
|
||||
Occupied(mut entry) => {
|
||||
entry.get_mut().may_schedule = node.may_schedule();
|
||||
}
|
||||
Vacant(entry) => {
|
||||
entry.insert(SchedulerNode {
|
||||
shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for (node_id, node) in nodes {
|
||||
if !node.may_schedule() {
|
||||
tenant_counts.remove(node_id);
|
||||
}
|
||||
}
|
||||
|
||||
Self { tenant_counts }
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_shard(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
) -> Result<NodeId, ScheduleError> {
|
||||
if self.tenant_counts.is_empty() {
|
||||
if self.nodes.is_empty() {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut tenant_counts: Vec<(NodeId, usize)> = self
|
||||
.tenant_counts
|
||||
.nodes
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
if hard_exclude.contains(k) {
|
||||
if hard_exclude.contains(k) || !v.may_schedule {
|
||||
None
|
||||
} else {
|
||||
Some((*k, *v))
|
||||
Some((*k, v.shard_count))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
@@ -73,7 +114,18 @@ impl Scheduler {
|
||||
tenant_counts.sort_by_key(|i| (i.1, i.0));
|
||||
|
||||
if tenant_counts.is_empty() {
|
||||
// After applying constraints, no pageservers were left
|
||||
// After applying constraints, no pageservers were left. We log some detail about
|
||||
// the state of nodes to help understand why this happened. This is not logged as an error because
|
||||
// it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard.
|
||||
tracing::info!("Scheduling failure, while excluding {hard_exclude:?}, node states:");
|
||||
for (node_id, node) in &self.nodes {
|
||||
tracing::info!(
|
||||
"Node {node_id}: may_schedule={} shards={}",
|
||||
node.may_schedule,
|
||||
node.shard_count
|
||||
);
|
||||
}
|
||||
|
||||
return Err(ScheduleError::ImpossibleConstraint);
|
||||
}
|
||||
|
||||
@@ -82,7 +134,88 @@ impl Scheduler {
|
||||
"scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})",
|
||||
tenant_counts.iter().map(|i| i.0 .0).collect::<Vec<_>>()
|
||||
);
|
||||
*self.tenant_counts.get_mut(&node_id).unwrap() += 1;
|
||||
|
||||
// Note that we do not update shard count here to reflect the scheduling: that
|
||||
// is IntentState's job when the scheduled location is used.
|
||||
|
||||
Ok(node_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy};
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::{node::Node, tenant_state::IntentState};
|
||||
|
||||
#[test]
|
||||
fn scheduler_basic() -> anyhow::Result<()> {
|
||||
let mut nodes = HashMap::new();
|
||||
nodes.insert(
|
||||
NodeId(1),
|
||||
Node {
|
||||
id: NodeId(1),
|
||||
availability: NodeAvailability::Active,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
listen_http_addr: String::new(),
|
||||
listen_http_port: 0,
|
||||
listen_pg_addr: String::new(),
|
||||
listen_pg_port: 0,
|
||||
},
|
||||
);
|
||||
|
||||
nodes.insert(
|
||||
NodeId(2),
|
||||
Node {
|
||||
id: NodeId(2),
|
||||
availability: NodeAvailability::Active,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
listen_http_addr: String::new(),
|
||||
listen_http_port: 0,
|
||||
listen_pg_addr: String::new(),
|
||||
listen_pg_port: 0,
|
||||
},
|
||||
);
|
||||
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
let mut t1_intent = IntentState::new();
|
||||
let mut t2_intent = IntentState::new();
|
||||
|
||||
let scheduled = scheduler.schedule_shard(&[])?;
|
||||
t1_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
let scheduled = scheduler.schedule_shard(&[])?;
|
||||
t2_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
|
||||
|
||||
let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers())?;
|
||||
t1_intent.push_secondary(&mut scheduler, scheduled);
|
||||
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 2);
|
||||
|
||||
t1_intent.clear(&mut scheduler);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
// Dropping an IntentState without clearing it causes a panic in debug mode,
|
||||
// because we have failed to properly update scheduler shard counts.
|
||||
let result = std::panic::catch_unwind(move || {
|
||||
drop(t2_intent);
|
||||
});
|
||||
assert!(result.is_err());
|
||||
} else {
|
||||
t2_intent.clear(&mut scheduler);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 0);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 0);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,6 +69,8 @@ struct ServiceState {
|
||||
|
||||
nodes: Arc<HashMap<NodeId, Node>>,
|
||||
|
||||
scheduler: Scheduler,
|
||||
|
||||
compute_hook: Arc<ComputeHook>,
|
||||
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
@@ -80,14 +82,26 @@ impl ServiceState {
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
nodes: HashMap<NodeId, Node>,
|
||||
tenants: BTreeMap<TenantShardId, TenantState>,
|
||||
scheduler: Scheduler,
|
||||
) -> Self {
|
||||
Self {
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
scheduler,
|
||||
compute_hook: Arc::new(ComputeHook::new(config)),
|
||||
result_tx,
|
||||
}
|
||||
}
|
||||
|
||||
fn parts_mut(
|
||||
&mut self,
|
||||
) -> (
|
||||
&mut Arc<HashMap<NodeId, Node>>,
|
||||
&mut BTreeMap<TenantShardId, TenantState>,
|
||||
&mut Scheduler,
|
||||
) {
|
||||
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -234,19 +248,20 @@ impl Service {
|
||||
// Populate intent and observed states for all tenants, based on reported state on pageservers
|
||||
let (shard_count, nodes) = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// Mark nodes online if they responded to us: nodes are offline by default after a restart.
|
||||
let mut nodes = (*locked.nodes).clone();
|
||||
for (node_id, node) in nodes.iter_mut() {
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
for (node_id, node) in new_nodes.iter_mut() {
|
||||
if nodes_online.contains(node_id) {
|
||||
node.availability = NodeAvailability::Active;
|
||||
scheduler.node_upsert(node);
|
||||
}
|
||||
}
|
||||
locked.nodes = Arc::new(nodes);
|
||||
let nodes = locked.nodes.clone();
|
||||
*nodes = Arc::new(new_nodes);
|
||||
|
||||
for (tenant_shard_id, (node_id, observed_loc)) in observed {
|
||||
let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push((tenant_shard_id, node_id));
|
||||
continue;
|
||||
};
|
||||
@@ -258,10 +273,9 @@ impl Service {
|
||||
}
|
||||
|
||||
// Populate each tenant's intent state
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
|
||||
for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
|
||||
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
|
||||
tenant_state.intent_from_observed();
|
||||
if let Err(e) = tenant_state.schedule(&mut scheduler) {
|
||||
if let Err(e) = tenant_state.schedule(scheduler) {
|
||||
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
|
||||
// not enough pageservers are available. The tenant may well still be available
|
||||
// to clients.
|
||||
@@ -276,7 +290,7 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
(locked.tenants.len(), nodes)
|
||||
(tenants.len(), nodes.clone())
|
||||
};
|
||||
|
||||
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
|
||||
@@ -393,7 +407,56 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
/// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation
|
||||
/// was successful, this will update the observed state of the tenant such that subsequent
|
||||
/// calls to [`TenantState::maybe_reconcile`] will do nothing.
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
|
||||
sequence=%result.sequence
|
||||
))]
|
||||
fn process_result(&self, result: ReconcileResult) {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
|
||||
// A reconciliation result might race with removing a tenant: drop results for
|
||||
// tenants that aren't in our map.
|
||||
return;
|
||||
};
|
||||
|
||||
// Usually generation should only be updated via this path, so the max() isn't
|
||||
// needed, but it is used to handle out-of-band updates via. e.g. test hook.
|
||||
tenant.generation = std::cmp::max(tenant.generation, result.generation);
|
||||
|
||||
// If the reconciler signals that it failed to notify compute, set this state on
|
||||
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
|
||||
tenant.pending_compute_notification = result.pending_compute_notification;
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
}
|
||||
tenant.observed = result.observed;
|
||||
tenant.waiter.advance(result.sequence);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Reconcile error: {}", e);
|
||||
|
||||
// Ordering: populate last_error before advancing error_seq,
|
||||
// so that waiters will see the correct error after waiting.
|
||||
*(tenant.last_error.lock().unwrap()) = format!("{e}");
|
||||
tenant.error_waiter.advance(result.sequence);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_results(
|
||||
&self,
|
||||
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
|
||||
@@ -412,55 +475,7 @@ impl Service {
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Reconcile result for sequence {}, ok={}",
|
||||
result.sequence,
|
||||
result.result.is_ok()
|
||||
);
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
|
||||
// A reconciliation result might race with removing a tenant: drop results for
|
||||
// tenants that aren't in our map.
|
||||
continue;
|
||||
};
|
||||
|
||||
// Usually generation should only be updated via this path, so the max() isn't
|
||||
// needed, but it is used to handle out-of-band updates via. e.g. test hook.
|
||||
tenant.generation = std::cmp::max(tenant.generation, result.generation);
|
||||
|
||||
// If the reconciler signals that it failed to notify compute, set this state on
|
||||
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
|
||||
tenant.pending_compute_notification = result.pending_compute_notification;
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
}
|
||||
tenant.observed = result.observed;
|
||||
tenant.waiter.advance(result.sequence);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Reconcile error on tenant {}: {}",
|
||||
tenant.tenant_shard_id,
|
||||
e
|
||||
);
|
||||
|
||||
// Ordering: populate last_error before advancing error_seq,
|
||||
// so that waiters will see the correct error after waiting.
|
||||
*(tenant.last_error.lock().unwrap()) = format!("{e}");
|
||||
tenant.error_waiter.advance(result.sequence);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.process_result(result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -481,6 +496,32 @@ impl Service {
|
||||
|
||||
let mut tenants = BTreeMap::new();
|
||||
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
// Hack: insert scheduler state for all nodes referenced by shards, as compatibility
|
||||
// tests only store the shards, not the nodes. The nodes will be loaded shortly
|
||||
// after when pageservers start up and register.
|
||||
let mut node_ids = HashSet::new();
|
||||
for tsp in &tenant_shard_persistence {
|
||||
node_ids.insert(tsp.generation_pageserver);
|
||||
}
|
||||
for node_id in node_ids {
|
||||
tracing::info!("Creating node {} in scheduler for tests", node_id);
|
||||
let node = Node {
|
||||
id: NodeId(node_id as u64),
|
||||
availability: NodeAvailability::Active,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
listen_http_addr: "".to_string(),
|
||||
listen_http_port: 123,
|
||||
listen_pg_addr: "".to_string(),
|
||||
listen_pg_port: 123,
|
||||
};
|
||||
|
||||
scheduler.node_upsert(&node);
|
||||
}
|
||||
}
|
||||
for tsp in tenant_shard_persistence {
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
|
||||
@@ -501,7 +542,10 @@ impl Service {
|
||||
// it with what we can infer: the node for which a generation was most recently issued.
|
||||
let mut intent = IntentState::new();
|
||||
if tsp.generation_pageserver != i64::MAX {
|
||||
intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
|
||||
intent.set_attached(
|
||||
&mut scheduler,
|
||||
Some(NodeId(tsp.generation_pageserver as u64)),
|
||||
);
|
||||
}
|
||||
|
||||
let new_tenant = TenantState {
|
||||
@@ -532,6 +576,7 @@ impl Service {
|
||||
result_tx,
|
||||
nodes,
|
||||
tenants,
|
||||
scheduler,
|
||||
))),
|
||||
config,
|
||||
persistence,
|
||||
@@ -636,8 +681,9 @@ impl Service {
|
||||
};
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let tenant_state = locked
|
||||
.tenants
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let tenant_state = tenants
|
||||
.get_mut(&attach_req.tenant_shard_id)
|
||||
.expect("Checked for existence above");
|
||||
|
||||
@@ -657,7 +703,7 @@ impl Service {
|
||||
generation = ?tenant_state.generation,
|
||||
"issuing",
|
||||
);
|
||||
} else if let Some(ps_id) = tenant_state.intent.attached {
|
||||
} else if let Some(ps_id) = tenant_state.intent.get_attached() {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
%ps_id,
|
||||
@@ -669,7 +715,9 @@ impl Service {
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
}
|
||||
tenant_state.intent.attached = attach_req.node_id;
|
||||
tenant_state
|
||||
.intent
|
||||
.set_attached(scheduler, attach_req.node_id);
|
||||
|
||||
tracing::info!(
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}",
|
||||
@@ -716,7 +764,7 @@ impl Service {
|
||||
InspectResponse {
|
||||
attachment: tenant_state.and_then(|s| {
|
||||
s.intent
|
||||
.attached
|
||||
.get_attached()
|
||||
.map(|ps| (s.generation.into().unwrap(), ps))
|
||||
}),
|
||||
}
|
||||
@@ -862,16 +910,15 @@ impl Service {
|
||||
|
||||
let (waiters, response_shards) = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut response_shards = Vec::new();
|
||||
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
|
||||
|
||||
for tenant_shard_id in create_ids {
|
||||
tracing::info!("Creating shard {tenant_shard_id}...");
|
||||
|
||||
use std::collections::btree_map::Entry;
|
||||
match locked.tenants.entry(tenant_shard_id) {
|
||||
match tenants.entry(tenant_shard_id) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
tracing::info!(
|
||||
"Tenant shard {tenant_shard_id} already exists while creating"
|
||||
@@ -881,7 +928,7 @@ impl Service {
|
||||
// attached and secondary locations (independently) away frorm those
|
||||
// pageservers also holding a shard for this tenant.
|
||||
|
||||
entry.get_mut().schedule(&mut scheduler).map_err(|e| {
|
||||
entry.get_mut().schedule(scheduler).map_err(|e| {
|
||||
ApiError::Conflict(format!(
|
||||
"Failed to schedule shard {tenant_shard_id}: {e}"
|
||||
))
|
||||
@@ -892,7 +939,7 @@ impl Service {
|
||||
node_id: entry
|
||||
.get()
|
||||
.intent
|
||||
.attached
|
||||
.get_attached()
|
||||
.expect("We just set pageserver if it was None"),
|
||||
generation: entry.get().generation.into().unwrap(),
|
||||
});
|
||||
@@ -914,7 +961,7 @@ impl Service {
|
||||
}
|
||||
state.config = create_req.config.clone();
|
||||
|
||||
state.schedule(&mut scheduler).map_err(|e| {
|
||||
state.schedule(scheduler).map_err(|e| {
|
||||
ApiError::Conflict(format!(
|
||||
"Failed to schedule shard {tenant_shard_id}: {e}"
|
||||
))
|
||||
@@ -924,7 +971,7 @@ impl Service {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: state
|
||||
.intent
|
||||
.attached
|
||||
.get_attached()
|
||||
.expect("We just set pageserver if it was None"),
|
||||
generation: state.generation.into().unwrap(),
|
||||
});
|
||||
@@ -1002,16 +1049,11 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let pageservers = locked.nodes.clone();
|
||||
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// Maybe we have existing shards
|
||||
let mut create = true;
|
||||
for (shard_id, shard) in locked
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
// Saw an existing shard: this is not a creation
|
||||
create = false;
|
||||
|
||||
@@ -1035,7 +1077,7 @@ impl Service {
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
// TODO: persistence for changes in policy
|
||||
if pageservers.len() > 1 {
|
||||
if nodes.len() > 1 {
|
||||
shard.policy = PlacementPolicy::Double(1)
|
||||
} else {
|
||||
// Convenience for dev/test: if we just have one pageserver, import
|
||||
@@ -1045,11 +1087,11 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
shard.schedule(&mut scheduler)?;
|
||||
shard.schedule(scheduler)?;
|
||||
|
||||
let maybe_waiter = shard.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
&pageservers,
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
@@ -1060,10 +1102,10 @@ impl Service {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
|
||||
if let Some(node_id) = shard.intent.attached {
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
result.shards.push(TenantShardLocation {
|
||||
shard_id: *shard_id,
|
||||
node_id,
|
||||
node_id: *node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1154,7 +1196,7 @@ impl Service {
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.attached.ok_or_else(|| {
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
@@ -1211,9 +1253,16 @@ impl Service {
|
||||
// Drop in-memory state
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked
|
||||
.tenants
|
||||
.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
// Dereference Scheduler from shards before dropping them
|
||||
for (_tenant_shard_id, shard) in
|
||||
tenants.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
shard.intent.clear(scheduler);
|
||||
}
|
||||
|
||||
tenants.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
|
||||
tracing::info!(
|
||||
"Deleted tenant {tenant_id}, now have {} tenants",
|
||||
locked.tenants.len()
|
||||
@@ -1248,7 +1297,7 @@ impl Service {
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.attached.ok_or_else(|| {
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
@@ -1329,7 +1378,7 @@ impl Service {
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.attached.ok_or_else(|| {
|
||||
let node_id = shard.intent.get_attached().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
@@ -1401,13 +1450,13 @@ impl Service {
|
||||
|
||||
// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
|
||||
// point to somewhere we haven't attached yet.
|
||||
let Some(node_id) = shard.intent.attached else {
|
||||
let Some(node_id) = shard.intent.get_attached() else {
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot call timeline API on non-attached tenant".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
let Some(node) = locked.nodes.get(&node_id) else {
|
||||
let Some(node) = locked.nodes.get(node_id) else {
|
||||
// This should never happen
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Shard refers to nonexistent node"
|
||||
@@ -1432,12 +1481,13 @@ impl Service {
|
||||
|
||||
for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard
|
||||
.intent
|
||||
.attached
|
||||
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot locate a tenant that is not attached"
|
||||
)))?;
|
||||
let node_id =
|
||||
shard
|
||||
.intent
|
||||
.get_attached()
|
||||
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot locate a tenant that is not attached"
|
||||
)))?;
|
||||
|
||||
let node = pageservers
|
||||
.get(&node_id)
|
||||
@@ -1510,106 +1560,104 @@ impl Service {
|
||||
}
|
||||
|
||||
// Validate input, and calculate which shards we will create
|
||||
let (old_shard_count, targets, compute_hook) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
let pageservers = locked.nodes.clone();
|
||||
|
||||
let mut targets = Vec::new();
|
||||
|
||||
// In case this is a retry, count how many already-split shards we found
|
||||
let mut children_found = Vec::new();
|
||||
let mut old_shard_count = None;
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
let (old_shard_count, targets, compute_hook) =
|
||||
{
|
||||
match shard.shard.count.count().cmp(&split_req.new_shard_count) {
|
||||
Ordering::Equal => {
|
||||
// Already split this
|
||||
children_found.push(*tenant_shard_id);
|
||||
continue;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Requested count {} but already have shards at count {}",
|
||||
split_req.new_shard_count,
|
||||
shard.shard.count.count()
|
||||
)));
|
||||
}
|
||||
Ordering::Less => {
|
||||
// Fall through: this shard has lower count than requested,
|
||||
// is a candidate for splitting.
|
||||
}
|
||||
}
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
match old_shard_count {
|
||||
None => old_shard_count = Some(shard.shard.count),
|
||||
Some(old_shard_count) => {
|
||||
if old_shard_count != shard.shard.count {
|
||||
// We may hit this case if a caller asked for two splits to
|
||||
// different sizes, before the first one is complete.
|
||||
// e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
|
||||
// of shard_count=1 and shard_count=2 shards in the map.
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot split, currently mid-split".to_string(),
|
||||
));
|
||||
let pageservers = locked.nodes.clone();
|
||||
|
||||
let mut targets = Vec::new();
|
||||
|
||||
// In case this is a retry, count how many already-split shards we found
|
||||
let mut children_found = Vec::new();
|
||||
let mut old_shard_count = None;
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
match shard.shard.count.count().cmp(&split_req.new_shard_count) {
|
||||
Ordering::Equal => {
|
||||
// Already split this
|
||||
children_found.push(*tenant_shard_id);
|
||||
continue;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Requested count {} but already have shards at count {}",
|
||||
split_req.new_shard_count,
|
||||
shard.shard.count.count()
|
||||
)));
|
||||
}
|
||||
Ordering::Less => {
|
||||
// Fall through: this shard has lower count than requested,
|
||||
// is a candidate for splitting.
|
||||
}
|
||||
}
|
||||
}
|
||||
if policy.is_none() {
|
||||
policy = Some(shard.policy.clone());
|
||||
}
|
||||
if shard_ident.is_none() {
|
||||
shard_ident = Some(shard.shard);
|
||||
}
|
||||
|
||||
if tenant_shard_id.shard_count.count() == split_req.new_shard_count {
|
||||
tracing::info!(
|
||||
"Tenant shard {} already has shard count {}",
|
||||
tenant_shard_id,
|
||||
split_req.new_shard_count
|
||||
);
|
||||
continue;
|
||||
}
|
||||
match old_shard_count {
|
||||
None => old_shard_count = Some(shard.shard.count),
|
||||
Some(old_shard_count) => {
|
||||
if old_shard_count != shard.shard.count {
|
||||
// We may hit this case if a caller asked for two splits to
|
||||
// different sizes, before the first one is complete.
|
||||
// e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
|
||||
// of shard_count=1 and shard_count=2 shards in the map.
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot split, currently mid-split".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
if policy.is_none() {
|
||||
policy = Some(shard.policy.clone());
|
||||
}
|
||||
if shard_ident.is_none() {
|
||||
shard_ident = Some(shard.shard);
|
||||
}
|
||||
|
||||
let node_id =
|
||||
shard
|
||||
.intent
|
||||
.attached
|
||||
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot split a tenant that is not attached"
|
||||
)))?;
|
||||
if tenant_shard_id.shard_count.count() == split_req.new_shard_count {
|
||||
tracing::info!(
|
||||
"Tenant shard {} already has shard count {}",
|
||||
tenant_shard_id,
|
||||
split_req.new_shard_count
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let node = pageservers
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
let node_id = shard.intent.get_attached().ok_or(ApiError::BadRequest(
|
||||
anyhow::anyhow!("Cannot split a tenant that is not attached"),
|
||||
))?;
|
||||
|
||||
// TODO: if any reconciliation is currently in progress for this shard, wait for it.
|
||||
let node = pageservers
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push(SplitTarget {
|
||||
parent_id: *tenant_shard_id,
|
||||
node: node.clone(),
|
||||
child_ids: tenant_shard_id.split(ShardCount::new(split_req.new_shard_count)),
|
||||
});
|
||||
}
|
||||
// TODO: if any reconciliation is currently in progress for this shard, wait for it.
|
||||
|
||||
if targets.is_empty() {
|
||||
if children_found.len() == split_req.new_shard_count as usize {
|
||||
return Ok(TenantShardSplitResponse {
|
||||
new_shards: children_found,
|
||||
targets.push(SplitTarget {
|
||||
parent_id: *tenant_shard_id,
|
||||
node: node.clone(),
|
||||
child_ids: tenant_shard_id
|
||||
.split(ShardCount::new(split_req.new_shard_count)),
|
||||
});
|
||||
} else {
|
||||
// No shards found to split, and no existing children found: the
|
||||
// tenant doesn't exist at all.
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
(old_shard_count, targets, locked.compute_hook.clone())
|
||||
};
|
||||
if targets.is_empty() {
|
||||
if children_found.len() == split_req.new_shard_count as usize {
|
||||
return Ok(TenantShardSplitResponse {
|
||||
new_shards: children_found,
|
||||
});
|
||||
} else {
|
||||
// No shards found to split, and no existing children found: the
|
||||
// tenant doesn't exist at all.
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
(old_shard_count, targets, locked.compute_hook.clone())
|
||||
};
|
||||
|
||||
// unwrap safety: we would have returned above if we didn't find at least one shard to split
|
||||
let old_shard_count = old_shard_count.unwrap();
|
||||
@@ -1751,6 +1799,7 @@ impl Service {
|
||||
let mut child_locations = Vec::new();
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
for target in targets {
|
||||
let SplitTarget {
|
||||
parent_id,
|
||||
@@ -1758,19 +1807,14 @@ impl Service {
|
||||
child_ids,
|
||||
} = target;
|
||||
let (pageserver, generation, config) = {
|
||||
let old_state = locked
|
||||
.tenants
|
||||
let mut old_state = tenants
|
||||
.remove(&parent_id)
|
||||
.expect("It was present, we just split it");
|
||||
(
|
||||
old_state.intent.attached.unwrap(),
|
||||
old_state.generation,
|
||||
old_state.config.clone(),
|
||||
)
|
||||
let old_attached = old_state.intent.get_attached().unwrap();
|
||||
old_state.intent.clear(scheduler);
|
||||
(old_attached, old_state.generation, old_state.config.clone())
|
||||
};
|
||||
|
||||
locked.tenants.remove(&parent_id);
|
||||
|
||||
for child in child_ids {
|
||||
let mut child_shard = shard_ident;
|
||||
child_shard.number = child.shard_number;
|
||||
@@ -1785,7 +1829,7 @@ impl Service {
|
||||
);
|
||||
|
||||
let mut child_state = TenantState::new(child, child_shard, policy.clone());
|
||||
child_state.intent = IntentState::single(Some(pageserver));
|
||||
child_state.intent = IntentState::single(scheduler, Some(pageserver));
|
||||
child_state.observed = ObservedState {
|
||||
locations: child_observed,
|
||||
};
|
||||
@@ -1798,7 +1842,7 @@ impl Service {
|
||||
|
||||
child_locations.push((child, pageserver));
|
||||
|
||||
locked.tenants.insert(child, child_state);
|
||||
tenants.insert(child, child_state);
|
||||
response.new_shards.push(child);
|
||||
}
|
||||
}
|
||||
@@ -1834,35 +1878,34 @@ impl Service {
|
||||
) -> Result<TenantShardMigrateResponse, ApiError> {
|
||||
let waiter = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let pageservers = locked.nodes.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant shard not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
if shard.intent.attached == Some(migrate_req.node_id) {
|
||||
if shard.intent.get_attached() == &Some(migrate_req.node_id) {
|
||||
// No-op case: we will still proceed to wait for reconciliation in case it is
|
||||
// incomplete from an earlier update to the intent.
|
||||
tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
|
||||
} else {
|
||||
let old_attached = shard.intent.attached;
|
||||
let old_attached = *shard.intent.get_attached();
|
||||
|
||||
match shard.policy {
|
||||
PlacementPolicy::Single => {
|
||||
shard.intent.secondary.clear();
|
||||
shard.intent.clear_secondary(scheduler);
|
||||
}
|
||||
PlacementPolicy::Double(_n) => {
|
||||
// If our new attached node was a secondary, it no longer should be.
|
||||
shard.intent.secondary.retain(|s| s != &migrate_req.node_id);
|
||||
shard.intent.remove_secondary(scheduler, migrate_req.node_id);
|
||||
|
||||
// If we were already attached to something, demote that to a secondary
|
||||
if let Some(old_attached) = old_attached {
|
||||
shard.intent.secondary.push(old_attached);
|
||||
shard.intent.push_secondary(scheduler, old_attached);
|
||||
}
|
||||
}
|
||||
PlacementPolicy::Detached => {
|
||||
@@ -1871,7 +1914,9 @@ impl Service {
|
||||
)))
|
||||
}
|
||||
}
|
||||
shard.intent.attached = Some(migrate_req.node_id);
|
||||
shard
|
||||
.intent
|
||||
.set_attached(scheduler, Some(migrate_req.node_id));
|
||||
|
||||
tracing::info!("Migrating: new intent {:?}", shard.intent);
|
||||
shard.sequence = shard.sequence.next();
|
||||
@@ -1879,7 +1924,7 @@ impl Service {
|
||||
|
||||
shard.maybe_reconcile(
|
||||
result_tx,
|
||||
&pageservers,
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
@@ -1903,13 +1948,16 @@ impl Service {
|
||||
self.persistence.delete_tenant(tenant_id).await?;
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
let mut shards = Vec::new();
|
||||
for (tenant_shard_id, _) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
|
||||
for (tenant_shard_id, _) in tenants.range(TenantShardId::tenant_range(tenant_id)) {
|
||||
shards.push(*tenant_shard_id);
|
||||
}
|
||||
|
||||
for shard in shards {
|
||||
locked.tenants.remove(&shard);
|
||||
for shard_id in shards {
|
||||
if let Some(mut shard) = tenants.remove(&shard_id) {
|
||||
shard.intent.clear(scheduler);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -2004,6 +2052,7 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let mut new_nodes = (*locked.nodes).clone();
|
||||
|
||||
locked.scheduler.node_upsert(&new_node);
|
||||
new_nodes.insert(register_req.node_id, new_node);
|
||||
|
||||
locked.nodes = Arc::new(new_nodes);
|
||||
@@ -2020,8 +2069,9 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut new_nodes = (*locked.nodes).clone();
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
|
||||
let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
@@ -2057,11 +2107,14 @@ impl Service {
|
||||
// to wake up and start working.
|
||||
}
|
||||
|
||||
// Update the scheduler, in case the elegibility of the node for new shards has changed
|
||||
scheduler.node_upsert(node);
|
||||
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes);
|
||||
if offline_transition {
|
||||
for (tenant_shard_id, tenant_state) in &mut locked.tenants {
|
||||
let mut tenants_affected: usize = 0;
|
||||
for (tenant_shard_id, tenant_state) in tenants {
|
||||
if let Some(observed_loc) =
|
||||
tenant_state.observed.locations.get_mut(&config_req.node_id)
|
||||
{
|
||||
@@ -2072,7 +2125,7 @@ impl Service {
|
||||
|
||||
if tenant_state.intent.notify_offline(config_req.node_id) {
|
||||
tenant_state.sequence = tenant_state.sequence.next();
|
||||
match tenant_state.schedule(&mut scheduler) {
|
||||
match tenant_state.schedule(scheduler) {
|
||||
Err(e) => {
|
||||
// It is possible that some tenants will become unschedulable when too many pageservers
|
||||
// go offline: in this case there isn't much we can do other than make the issue observable.
|
||||
@@ -2080,19 +2133,29 @@ impl Service {
|
||||
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
tenant_state.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
&new_nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
if tenant_state
|
||||
.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
&new_nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
tenants_affected += 1;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
"Launched {} reconciler tasks for tenants affected by node {} going offline",
|
||||
tenants_affected,
|
||||
config_req.node_id
|
||||
)
|
||||
}
|
||||
|
||||
if active_transition {
|
||||
@@ -2135,18 +2198,14 @@ impl Service {
|
||||
let mut waiters = Vec::new();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
|
||||
let pageservers = locked.nodes.clone();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
for (_tenant_shard_id, shard) in locked
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
shard.schedule(&mut scheduler)?;
|
||||
for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
shard.schedule(scheduler)?;
|
||||
|
||||
if let Some(waiter) = shard.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
&pageservers,
|
||||
nodes,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
|
||||
@@ -19,7 +19,9 @@ use crate::{
|
||||
compute_hook::ComputeHook,
|
||||
node::Node,
|
||||
persistence::{split_state::SplitState, Persistence},
|
||||
reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
|
||||
reconciler::{
|
||||
attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
|
||||
},
|
||||
scheduler::{ScheduleError, Scheduler},
|
||||
service, PlacementPolicy, Sequence,
|
||||
};
|
||||
@@ -88,8 +90,107 @@ pub(crate) struct TenantState {
|
||||
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub(crate) struct IntentState {
|
||||
pub(crate) attached: Option<NodeId>,
|
||||
pub(crate) secondary: Vec<NodeId>,
|
||||
attached: Option<NodeId>,
|
||||
secondary: Vec<NodeId>,
|
||||
}
|
||||
|
||||
impl IntentState {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
attached: None,
|
||||
secondary: vec![],
|
||||
}
|
||||
}
|
||||
pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
|
||||
if let Some(node_id) = node_id {
|
||||
scheduler.node_inc_ref(node_id);
|
||||
}
|
||||
Self {
|
||||
attached: node_id,
|
||||
secondary: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
|
||||
if self.attached != new_attached {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_dec_ref(old_attached);
|
||||
}
|
||||
if let Some(new_attached) = &new_attached {
|
||||
scheduler.node_inc_ref(*new_attached);
|
||||
}
|
||||
self.attached = new_attached;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
|
||||
debug_assert!(!self.secondary.contains(&new_secondary));
|
||||
scheduler.node_inc_ref(new_secondary);
|
||||
self.secondary.push(new_secondary);
|
||||
}
|
||||
|
||||
/// It is legal to call this with a node that is not currently a secondary: that is a no-op
|
||||
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
|
||||
let index = self.secondary.iter().position(|n| *n == node_id);
|
||||
if let Some(index) = index {
|
||||
scheduler.node_dec_ref(node_id);
|
||||
self.secondary.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
|
||||
for secondary in self.secondary.drain(..) {
|
||||
scheduler.node_dec_ref(secondary);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_dec_ref(old_attached);
|
||||
}
|
||||
|
||||
self.clear_secondary(scheduler);
|
||||
}
|
||||
|
||||
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
|
||||
let mut result = Vec::new();
|
||||
if let Some(p) = self.attached {
|
||||
result.push(p)
|
||||
}
|
||||
|
||||
result.extend(self.secondary.iter().copied());
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) fn get_attached(&self) -> &Option<NodeId> {
|
||||
&self.attached
|
||||
}
|
||||
|
||||
pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
|
||||
&self.secondary
|
||||
}
|
||||
|
||||
/// When a node goes offline, we update intents to avoid using it
|
||||
/// as their attached pageserver.
|
||||
///
|
||||
/// Returns true if a change was made
|
||||
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
|
||||
if self.attached == Some(node_id) {
|
||||
self.attached = None;
|
||||
self.secondary.push(node_id);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for IntentState {
|
||||
fn drop(&mut self) {
|
||||
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler
|
||||
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
@@ -182,46 +283,6 @@ pub(crate) struct ReconcileResult {
|
||||
pub(crate) pending_compute_notification: bool,
|
||||
}
|
||||
|
||||
impl IntentState {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
attached: None,
|
||||
secondary: vec![],
|
||||
}
|
||||
}
|
||||
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
|
||||
let mut result = Vec::new();
|
||||
if let Some(p) = self.attached {
|
||||
result.push(p)
|
||||
}
|
||||
|
||||
result.extend(self.secondary.iter().copied());
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) fn single(node_id: Option<NodeId>) -> Self {
|
||||
Self {
|
||||
attached: node_id,
|
||||
secondary: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// When a node goes offline, we update intents to avoid using it
|
||||
/// as their attached pageserver.
|
||||
///
|
||||
/// Returns true if a change was made
|
||||
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
|
||||
if self.attached == Some(node_id) {
|
||||
self.attached = None;
|
||||
self.secondary.push(node_id);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ObservedState {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
@@ -315,12 +376,12 @@ impl TenantState {
|
||||
// Should have exactly one attached, and zero secondaries
|
||||
if self.intent.attached.is_none() {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
self.intent.attached = Some(node_id);
|
||||
self.intent.set_attached(scheduler, Some(node_id));
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
if !self.intent.secondary.is_empty() {
|
||||
self.intent.secondary.clear();
|
||||
self.intent.clear_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
@@ -328,14 +389,14 @@ impl TenantState {
|
||||
// Should have exactly one attached, and N secondaries
|
||||
if self.intent.attached.is_none() {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
self.intent.attached = Some(node_id);
|
||||
self.intent.set_attached(scheduler, Some(node_id));
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
while self.intent.secondary.len() < secondary_count {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
self.intent.secondary.push(node_id);
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
}
|
||||
@@ -343,12 +404,12 @@ impl TenantState {
|
||||
Detached => {
|
||||
// Should have no attached or secondary pageservers
|
||||
if self.intent.attached.is_some() {
|
||||
self.intent.attached = None;
|
||||
self.intent.set_attached(scheduler, None);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
if !self.intent.secondary.is_empty() {
|
||||
self.intent.secondary.clear();
|
||||
self.intent.clear_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
@@ -490,7 +551,7 @@ impl TenantState {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
shard: self.shard,
|
||||
generation: self.generation,
|
||||
intent: self.intent.clone(),
|
||||
intent: TargetState::from_intent(&self.intent),
|
||||
config: self.config.clone(),
|
||||
observed: self.observed.clone(),
|
||||
pageservers: pageservers.clone(),
|
||||
|
||||
Reference in New Issue
Block a user