mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 01:10:38 +00:00
Compare commits
18 Commits
split-prox
...
vlad/tmp/g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fce602ed30 | ||
|
|
fc24ba5233 | ||
|
|
e200a2b01e | ||
|
|
2a7f224306 | ||
|
|
d86ddf2b76 | ||
|
|
86d5f4ada9 | ||
|
|
089edb55e8 | ||
|
|
1302f9442a | ||
|
|
80612d2688 | ||
|
|
7f96ac3435 | ||
|
|
999fbbb2a3 | ||
|
|
d22e0b5398 | ||
|
|
58340f9dbf | ||
|
|
fcbac527b0 | ||
|
|
a5154cf990 | ||
|
|
bfe5df8c4e | ||
|
|
46927bc228 | ||
|
|
bb9c792813 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5801,6 +5801,7 @@ dependencies = [
|
||||
"r2d2",
|
||||
"reqwest 0.12.4",
|
||||
"routerify",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"strum",
|
||||
|
||||
@@ -209,6 +209,7 @@ pub enum NodeSchedulingPolicy {
|
||||
Active,
|
||||
Filling,
|
||||
Pause,
|
||||
PauseForRestart,
|
||||
Draining,
|
||||
}
|
||||
|
||||
@@ -220,6 +221,7 @@ impl FromStr for NodeSchedulingPolicy {
|
||||
"active" => Ok(Self::Active),
|
||||
"filling" => Ok(Self::Filling),
|
||||
"pause" => Ok(Self::Pause),
|
||||
"pause_for_restart" => Ok(Self::PauseForRestart),
|
||||
"draining" => Ok(Self::Draining),
|
||||
_ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
|
||||
}
|
||||
@@ -233,6 +235,7 @@ impl From<NodeSchedulingPolicy> for String {
|
||||
Active => "active",
|
||||
Filling => "filling",
|
||||
Pause => "pause",
|
||||
PauseForRestart => "pause_for_restart",
|
||||
Draining => "draining",
|
||||
}
|
||||
.to_string()
|
||||
|
||||
@@ -40,6 +40,7 @@ tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
measured.workspace = true
|
||||
scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
|
||||
57
storage_controller/src/background_node_operations.rs
Normal file
57
storage_controller/src/background_node_operations.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use std::{borrow::Cow, fmt::Debug, fmt::Display};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::NodeId;
|
||||
|
||||
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct Drain {
|
||||
pub(crate) node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct Fill {
|
||||
pub(crate) node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) enum Operation {
|
||||
Drain(Drain),
|
||||
Fill(Fill),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum OperationError {
|
||||
#[error("Node state changed during operation: {0}")]
|
||||
NodeStateChanged(Cow<'static, str>),
|
||||
#[error("Operation cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub(crate) struct OperationHandler {
|
||||
pub(crate) operation: Operation,
|
||||
#[allow(unused)]
|
||||
pub(crate) cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl Display for Drain {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "drain {}", self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Fill {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "fill {}", self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Operation {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
Operation::Drain(op) => write!(f, "{op}"),
|
||||
Operation::Fill(op) => write!(f, "{op}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -480,6 +480,39 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
let node_status = state.service.get_node(node_id).await?;
|
||||
|
||||
json_response(StatusCode::OK, node_status)
|
||||
}
|
||||
|
||||
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.start_node_drain(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.start_node_fill(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_split(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -832,6 +865,16 @@ pub fn make_router(
|
||||
RequestName("control_v1_node_config"),
|
||||
)
|
||||
})
|
||||
.get("/control/v1/node/:node_id", |r| {
|
||||
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
|
||||
})
|
||||
.put("/control/v1/node/:node_id/drain", |r| {
|
||||
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
|
||||
})
|
||||
.put("/control/v1/node/:node_id/fill", |r| {
|
||||
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
|
||||
})
|
||||
// TODO(vlad): endpoint for cancelling drain and fill
|
||||
// Tenant Shard operations
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -2,6 +2,7 @@ use serde::Serialize;
|
||||
use utils::seqwait::MonotonicCounter;
|
||||
|
||||
mod auth;
|
||||
mod background_node_operations;
|
||||
mod compute_hook;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
|
||||
@@ -59,6 +59,10 @@ impl Node {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
|
||||
self.scheduling
|
||||
}
|
||||
|
||||
pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
|
||||
self.scheduling = scheduling
|
||||
}
|
||||
@@ -141,6 +145,7 @@ impl Node {
|
||||
NodeSchedulingPolicy::Draining => MaySchedule::No,
|
||||
NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
|
||||
NodeSchedulingPolicy::Pause => MaySchedule::No,
|
||||
NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +162,7 @@ impl Node {
|
||||
listen_http_port,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
scheduling: NodeSchedulingPolicy::Filling,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
availability: NodeAvailability::Offline,
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
|
||||
@@ -442,13 +442,15 @@ impl Persistence {
|
||||
#[tracing::instrument(skip_all, fields(node_id))]
|
||||
pub(crate) async fn re_attach(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
input_node_id: NodeId,
|
||||
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
|
||||
use crate::schema::nodes::dsl::scheduling_policy;
|
||||
use crate::schema::nodes::dsl::*;
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
let updated = self
|
||||
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
|
||||
let rows_updated = diesel::update(tenant_shards)
|
||||
.filter(generation_pageserver.eq(node_id.0 as i64))
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.set(generation.eq(generation + 1))
|
||||
.execute(conn)?;
|
||||
|
||||
@@ -457,9 +459,23 @@ impl Persistence {
|
||||
// TODO: UPDATE+SELECT in one query
|
||||
|
||||
let updated = tenant_shards
|
||||
.filter(generation_pageserver.eq(node_id.0 as i64))
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.select(TenantShardPersistence::as_select())
|
||||
.load(conn)?;
|
||||
|
||||
// If the node went through a drain and restart phase before re-attaching,
|
||||
// then reset it's node scheduling policy to active.
|
||||
diesel::update(nodes)
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.filter(
|
||||
scheduling_policy
|
||||
.eq(String::from(NodeSchedulingPolicy::PauseForRestart))
|
||||
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining)))
|
||||
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Filling))),
|
||||
)
|
||||
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(updated)
|
||||
})
|
||||
.await?;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{node::Node, tenant_shard::TenantShard};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::UtilizationScore;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
@@ -283,6 +284,44 @@ impl Scheduler {
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the number of shards attached to a given node is lagging below
|
||||
// the cluster average. If that's the case, the node should be filled.
|
||||
pub(crate) fn compute_fill_requirement(&self, node_id: NodeId) -> usize {
|
||||
let Some(node) = self.nodes.get(&node_id) else {
|
||||
debug_assert!(false);
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
return 0;
|
||||
};
|
||||
assert!(!self.nodes.is_empty());
|
||||
let expected_attached_shards_per_node = self.expected_attached_shard_count();
|
||||
|
||||
for (node_id, node) in self.nodes.iter() {
|
||||
tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
|
||||
}
|
||||
|
||||
if node.attached_shard_count < expected_attached_shards_per_node {
|
||||
expected_attached_shards_per_node - node.attached_shard_count
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn expected_attached_shard_count(&self) -> usize {
|
||||
let total_attached_shards: usize =
|
||||
self.nodes.values().map(|n| n.attached_shard_count).sum();
|
||||
|
||||
assert!(!self.nodes.is_empty());
|
||||
total_attached_shards / self.nodes.len()
|
||||
}
|
||||
|
||||
pub(crate) fn nodes_by_attached_shard_count(&self) -> Vec<(NodeId, usize)> {
|
||||
self.nodes
|
||||
.iter()
|
||||
.map(|(node_id, stats)| (*node_id, stats.attached_shard_count))
|
||||
.sorted_by(|lhs, rhs| Ord::cmp(&lhs.1, &rhs.1).reverse())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn node_upsert(&mut self, node: &Node) {
|
||||
use std::collections::hash_map::Entry::*;
|
||||
match self.nodes.entry(node.get_id()) {
|
||||
|
||||
@@ -8,13 +8,17 @@ use std::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
background_node_operations::{
|
||||
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
|
||||
},
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{ScheduleContext, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
@@ -134,6 +138,8 @@ struct ServiceState {
|
||||
|
||||
scheduler: Scheduler,
|
||||
|
||||
ongoing_operation: Option<OperationHandler>,
|
||||
|
||||
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
}
|
||||
@@ -185,6 +191,7 @@ impl ServiceState {
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
scheduler,
|
||||
ongoing_operation: None,
|
||||
delayed_reconcile_rx,
|
||||
}
|
||||
}
|
||||
@@ -296,6 +303,17 @@ impl From<ReconcileWaitError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OperationError> for ApiError {
|
||||
fn from(value: OperationError) -> Self {
|
||||
match value {
|
||||
OperationError::NodeStateChanged(err) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(err))
|
||||
}
|
||||
OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum TenantCreateOrUpdate {
|
||||
Create(TenantCreateRequest),
|
||||
@@ -1562,15 +1580,32 @@ impl Service {
|
||||
// Setting a node active unblocks any Reconcilers that might write to the location config API,
|
||||
// but those requests will not be accepted by the node until it has finished processing
|
||||
// the re-attach response.
|
||||
//
|
||||
// Additionally, reset the nodes scheduling policy to match the conditional update done
|
||||
// in [`Persistence::re_attach`].
|
||||
if let Some(node) = nodes.get(&reattach_req.node_id) {
|
||||
if !node.is_available() {
|
||||
let reset_scheduling = matches!(
|
||||
node.get_scheduling(),
|
||||
NodeSchedulingPolicy::PauseForRestart
|
||||
| NodeSchedulingPolicy::Draining
|
||||
| NodeSchedulingPolicy::Filling
|
||||
);
|
||||
|
||||
if !node.is_available() || reset_scheduling {
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
|
||||
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
|
||||
if !node.is_available() {
|
||||
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
|
||||
}
|
||||
|
||||
if reset_scheduling {
|
||||
node.set_scheduling(NodeSchedulingPolicy::Active);
|
||||
}
|
||||
|
||||
scheduler.node_upsert(node);
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1851,6 +1886,25 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Same as [`Service::await_waiters`], but returns the waiters which are still
|
||||
/// in progress
|
||||
async fn await_waiters_remainder(
|
||||
&self,
|
||||
waiters: Vec<ReconcilerWaiter>,
|
||||
timeout: Duration,
|
||||
) -> Vec<ReconcilerWaiter> {
|
||||
let deadline = Instant::now().checked_add(timeout).unwrap();
|
||||
for waiter in waiters.iter() {
|
||||
let timeout = deadline.duration_since(Instant::now());
|
||||
let _ = waiter.wait_timeout(timeout).await;
|
||||
}
|
||||
|
||||
waiters
|
||||
.into_iter()
|
||||
.filter(|waiter| matches!(waiter.get_status(), ReconcilerStatus::InProgress))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Part of [`Self::tenant_location_config`]: dissect an incoming location config request,
|
||||
/// and transform it into either a tenant creation of a series of shard updates.
|
||||
///
|
||||
@@ -4128,6 +4182,18 @@ impl Service {
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_node(&self, node_id: NodeId) -> Result<Node, ApiError> {
|
||||
self.inner
|
||||
.read()
|
||||
.unwrap()
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.cloned()
|
||||
.ok_or(ApiError::NotFound(
|
||||
format!("Node {node_id} not registered").into(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn node_register(
|
||||
&self,
|
||||
register_req: NodeRegisterRequest,
|
||||
@@ -4282,9 +4348,6 @@ impl Service {
|
||||
|
||||
if let Some(scheduling) = scheduling {
|
||||
node.set_scheduling(scheduling);
|
||||
|
||||
// TODO: once we have a background scheduling ticker for fill/drain, kick it
|
||||
// to wake up and start working.
|
||||
}
|
||||
|
||||
// Update the scheduler, in case the elegibility of the node for new shards has changed
|
||||
@@ -4359,7 +4422,7 @@ impl Service {
|
||||
// TODO: in the background, we should balance work back onto this pageserver
|
||||
}
|
||||
AvailabilityTransition::Unchanged => {
|
||||
tracing::debug!("Node {} no change during config", node_id);
|
||||
tracing::debug!("Node {} no availability change during config", node_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4368,6 +4431,176 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_drain(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
let schedulable_nodes_count = nodes
|
||||
.iter()
|
||||
.filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
.count();
|
||||
|
||||
(
|
||||
locked
|
||||
.ongoing_operation
|
||||
.as_ref()
|
||||
.map(|ongoing| ongoing.operation),
|
||||
node.is_available(),
|
||||
node.get_scheduling(),
|
||||
schedulable_nodes_count,
|
||||
)
|
||||
};
|
||||
|
||||
if let Some(ongoing) = ongoing_op {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Background operation already ongoing for node: {}", ongoing).into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if schedulable_nodes_count == 0 {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
"No other schedulable nodes to drain to".into(),
|
||||
));
|
||||
}
|
||||
|
||||
match node_policy {
|
||||
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
|
||||
operation: Operation::Drain(Drain { node_id }),
|
||||
cancel: cancel.clone(),
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let prev = service.inner.write().unwrap().ongoing_operation.take();
|
||||
|
||||
if let Some(Operation::Drain(removed_drain)) = prev.map(|h| h.operation) {
|
||||
assert_eq!(removed_drain.node_id, node_id, "We always take the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
service.drain_node(node_id, cancel).await
|
||||
}
|
||||
});
|
||||
}
|
||||
NodeSchedulingPolicy::Draining => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node_id} has drain in progress"
|
||||
)));
|
||||
}
|
||||
policy => {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} cannot be drained due to {policy:?} policy").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_fill(self: &Arc<Self>, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, total_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(
|
||||
locked
|
||||
.ongoing_operation
|
||||
.as_ref()
|
||||
.map(|ongoing| ongoing.operation),
|
||||
node.is_available(),
|
||||
node.get_scheduling(),
|
||||
nodes.len(),
|
||||
)
|
||||
};
|
||||
|
||||
if let Some(ongoing) = ongoing_op {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Background operation already ongoing for node: {}", ongoing).into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if total_nodes_count <= 1 {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
"No other nodes to fill from".into(),
|
||||
));
|
||||
}
|
||||
|
||||
match node_policy {
|
||||
NodeSchedulingPolicy::Active => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
|
||||
operation: Operation::Fill(Fill { node_id }),
|
||||
cancel: cancel.clone(),
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let prev = service.inner.write().unwrap().ongoing_operation.take();
|
||||
|
||||
if let Some(Operation::Fill(removed_fill)) = prev.map(|h| h.operation) {
|
||||
assert_eq!(removed_fill.node_id, node_id, "We always take the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
service.fill_node(node_id, cancel).await
|
||||
}
|
||||
});
|
||||
}
|
||||
NodeSchedulingPolicy::Filling => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node_id} has fill in progress"
|
||||
)));
|
||||
}
|
||||
policy => {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} cannot be filled due to {policy:?} policy").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
@@ -4952,4 +5185,268 @@ impl Service {
|
||||
// to complete.
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
/// Drain a node by moving the shards attached to it as primaries.
|
||||
/// This is a long running operation and it should run as a separate Tokio task.
|
||||
pub(crate) async fn drain_node(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
tracing::info!(%node_id, "Starting drain background operation");
|
||||
|
||||
let mut last_inspected_shard: Option<TenantShardId> = None;
|
||||
let mut inspected_all_shards = false;
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
while !inspected_all_shards {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
}
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut cursor = tenants.iter_mut().skip_while({
|
||||
let skip_past = last_inspected_shard;
|
||||
move |(tid, _)| match skip_past {
|
||||
Some(last) => **tid != last,
|
||||
None => false,
|
||||
}
|
||||
});
|
||||
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
let (tid, tenant_shard) = match cursor.next() {
|
||||
Some(some) => some,
|
||||
None => {
|
||||
inspected_all_shards = true;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(%tid, "Scheduling error when draining pageserver {} : {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
last_inspected_shard = Some(*tid);
|
||||
}
|
||||
}
|
||||
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
}
|
||||
|
||||
while !waiters.is_empty() {
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
}
|
||||
|
||||
// At this point we have done the best we could to drain shards from this node.
|
||||
// Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]`
|
||||
// to complete the drain.
|
||||
if let Err(err) = self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart))
|
||||
.await
|
||||
{
|
||||
// This is not fatal. Anything that is polling the node scheduling policy to detect
|
||||
// the end of the drain operations will hang, but all such places should enforce an
|
||||
// overall timeout. The scheduling policy will be updated upon node re-attach and/or
|
||||
// by the counterpart fill operation.
|
||||
tracing::warn!(%node_id, "Failed to finalise drain by setting scheduling policy: {err}");
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Completed drain background operation");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a node fill plan (pick secondaries to promote) that meets the following requirements:
|
||||
/// 1. The node should be filled until it reaches the expected cluster average of
|
||||
/// attached shards. If there are not enough secondaries on the node, the plan stops early.
|
||||
/// 2. Select tenant shards to promote such that the number of attached shards is balanced
|
||||
/// throughout the cluster. We achieve this by picking tenant shards from each node,
|
||||
/// starting from the ones with the largest number of attached shards, until the node
|
||||
/// reaches the expected cluster average.
|
||||
fn fill_node_plan(&self, node_id: NodeId) -> Vec<TenantShardId> {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let fill_requirement = locked.scheduler.compute_fill_requirement(node_id);
|
||||
|
||||
let mut tids_by_node = locked
|
||||
.tenants
|
||||
.iter_mut()
|
||||
.filter_map(|(tid, tenant_shard)| {
|
||||
if tenant_shard.intent.get_secondary().contains(&node_id) {
|
||||
if let Some(primary) = tenant_shard.intent.get_attached() {
|
||||
return Some((*primary, *tid));
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
})
|
||||
.into_group_map();
|
||||
|
||||
let expected_attached = locked.scheduler.expected_attached_shard_count();
|
||||
let nodes_by_load = locked.scheduler.nodes_by_attached_shard_count();
|
||||
|
||||
let mut plan = Vec::new();
|
||||
for (node_id, attached) in nodes_by_load {
|
||||
if plan.len() >= fill_requirement
|
||||
|| tids_by_node.is_empty()
|
||||
|| attached <= expected_attached
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
let can_take = attached - expected_attached;
|
||||
let mut remove_node = false;
|
||||
for _ in 0..can_take {
|
||||
match tids_by_node.get_mut(&node_id) {
|
||||
Some(tids) => match tids.pop() {
|
||||
Some(tid) => {
|
||||
plan.push(tid);
|
||||
}
|
||||
None => {
|
||||
remove_node = true;
|
||||
break;
|
||||
}
|
||||
},
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if remove_node {
|
||||
tids_by_node.remove(&node_id);
|
||||
}
|
||||
}
|
||||
|
||||
plan
|
||||
}
|
||||
|
||||
/// Fill a node by promoting its secondaries until the cluster is balanced
|
||||
/// with regards to attached shard counts. Note that this operation only
|
||||
/// makes sense as a counterpart to the drain implemented in [`Service::drain_node`].
|
||||
/// This is a long running operation and it should run as a separate Tokio task.
|
||||
pub(crate) async fn fill_node(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
// TODO(vlad): Currently this operates on the assumption that all
|
||||
// secondaries are warm. This is not always true (e.g. we just migrated the
|
||||
// tenant). Take that into consideration by checking the secondary status.
|
||||
|
||||
tracing::info!(%node_id, "Starting fill background operation");
|
||||
|
||||
let mut tids_to_promote = self.fill_node_plan(node_id);
|
||||
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
// Execute the plan we've composed above. Before aplying each move from the plan,
|
||||
// we validate to ensure that it has not gone stale in the meantime.
|
||||
while !tids_to_promote.is_empty() {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
}
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Filling) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
if let Some(tid) = tids_to_promote.pop() {
|
||||
if let Some(tenant_shard) = tenants.get_mut(&tid) {
|
||||
// If the node being filled is not a secondary anymore,
|
||||
// skip the promotion.
|
||||
if !tenant_shard.intent.get_secondary().contains(&node_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tenant_shard.intent.promote_attached(scheduler, node_id);
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(%tid, "Scheduling error when filling pageserver {} : {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
if let Some(waiter) =
|
||||
self.maybe_reconcile_shard(tenant_shard, nodes)
|
||||
{
|
||||
waiters.push(waiter);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
}
|
||||
|
||||
while !waiters.is_empty() {
|
||||
waiters = self
|
||||
.await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT)
|
||||
.await;
|
||||
}
|
||||
|
||||
if let Err(err) = self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
|
||||
.await
|
||||
{
|
||||
// This isn't a huge issue since the filling process starts upon request. However, it
|
||||
// will prevent the next drain from starting. The only case in which this can fail
|
||||
// is database unavailability. Such a case will require manual intervention.
|
||||
tracing::error!(%node_id, "Failed to finalise fill by setting scheduling policy: {err}");
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Completed fill background operation");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,9 @@ use crate::{
|
||||
reconciler::ReconcileUnits,
|
||||
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
|
||||
};
|
||||
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
|
||||
use pageserver_api::controller_api::{
|
||||
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::{
|
||||
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
@@ -311,6 +313,12 @@ pub(crate) struct ReconcilerWaiter {
|
||||
seq: Sequence,
|
||||
}
|
||||
|
||||
pub(crate) enum ReconcilerStatus {
|
||||
Done,
|
||||
Failed,
|
||||
InProgress,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum ReconcileWaitError {
|
||||
#[error("Timeout waiting for shard {0}")]
|
||||
@@ -373,6 +381,16 @@ impl ReconcilerWaiter {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_status(&self) -> ReconcilerStatus {
|
||||
if self.seq_wait.would_wait_for(self.seq).is_err() {
|
||||
ReconcilerStatus::Done
|
||||
} else if self.error_seq_wait.would_wait_for(self.seq).is_err() {
|
||||
ReconcilerStatus::Failed
|
||||
} else {
|
||||
ReconcilerStatus::InProgress
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Having spawned a reconciler task, the tenant shard's state will carry enough
|
||||
@@ -652,13 +670,17 @@ impl TenantShard {
|
||||
let mut scores = all_pageservers
|
||||
.iter()
|
||||
.flat_map(|node_id| {
|
||||
if matches!(
|
||||
nodes
|
||||
.get(node_id)
|
||||
.map(|n| n.may_schedule())
|
||||
.unwrap_or(MaySchedule::No),
|
||||
MaySchedule::No
|
||||
let node = nodes.get(node_id);
|
||||
if node.is_none() {
|
||||
None
|
||||
} else if matches!(
|
||||
node.unwrap().get_scheduling(),
|
||||
NodeSchedulingPolicy::Filling
|
||||
) {
|
||||
// If the node is currently filling, don't count it as a candidate to avoid,
|
||||
// racing with the background fill.
|
||||
None
|
||||
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
|
||||
None
|
||||
} else {
|
||||
let affinity_score = schedule_context.get_node_affinity(*node_id);
|
||||
|
||||
@@ -2213,6 +2213,30 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_drain(self, node_id):
|
||||
log.info(f"node_drain({node_id})")
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_fill(self, node_id):
|
||||
log.info(f"node_fill({node_id})")
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_status(self, node_id):
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return response.json()
|
||||
|
||||
def node_list(self):
|
||||
response = self.request(
|
||||
"GET",
|
||||
|
||||
@@ -1477,3 +1477,120 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
|
||||
workload = Workload(env, tenant_id, timeline, branch_name=branch)
|
||||
workload.expect_rows = expect_rows
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Graceful reststart of storage controller clusters use the drain and
|
||||
fill hooks in order to migrate attachments away from pageservers before
|
||||
restarting. In practice, Ansible will drive this process.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_count = 5
|
||||
shard_count_per_tenant = 8
|
||||
total_shards = tenant_count * shard_count_per_tenant
|
||||
tenant_ids = []
|
||||
|
||||
for _ in range(0, tenant_count):
|
||||
tid = TenantId.generate()
|
||||
tenant_ids.append(tid)
|
||||
env.neon_cli.create_tenant(
|
||||
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
|
||||
)
|
||||
|
||||
# Give things a chance to settle.
|
||||
# A call to `reconcile_until_idle` could be used here instead,
|
||||
# however since all attachments are placed on the same node,
|
||||
# we'd have to wait for a long time (2 minutes-ish) for optimizations
|
||||
# to quiesce.
|
||||
# TODO: once the initial attachment selection is fixed, update this
|
||||
# to use `reconcile_until_idle`.
|
||||
time.sleep(2)
|
||||
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
def retryable_node_operation(op, ps_id, max_attempts, backoff):
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
op(ps_id)
|
||||
return
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff):
|
||||
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
status = env.storage_controller.node_status(node_id)
|
||||
policy = status["scheduling"]
|
||||
if policy == desired_scheduling_policy:
|
||||
return
|
||||
else:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise AssertionError(
|
||||
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
|
||||
)
|
||||
|
||||
time.sleep(backoff)
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call failed ({max_attempts} retries left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
|
||||
# Assert that all nodes have some attached shards
|
||||
assert len(shard_counts) == len(env.pageservers)
|
||||
|
||||
min_shard_count = min(shard_counts.values())
|
||||
max_shard_count = max(shard_counts.values())
|
||||
|
||||
flake_factor = 5 / 100
|
||||
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
|
||||
|
||||
# Perform a graceful rolling restart
|
||||
for ps in env.pageservers:
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
|
||||
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
|
||||
# Assert that we've drained the node
|
||||
assert shard_counts[str(ps.id)] == 0
|
||||
# Assert that those shards actually went somewhere
|
||||
assert sum(shard_counts.values()) == total_shards
|
||||
|
||||
ps.restart()
|
||||
poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
|
||||
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
|
||||
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
|
||||
assert_shard_counts_balanced(env, shard_counts, total_shards)
|
||||
|
||||
# Now check that shards are reasonably balanced
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after rolling restart: {shard_counts}")
|
||||
assert_shard_counts_balanced(env, shard_counts, total_shards)
|
||||
|
||||
Reference in New Issue
Block a user