mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 04:30:38 +00:00
Compare commits
1 Commits
fcdm/dev-e
...
vlad/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
053235cf57 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5801,6 +5801,7 @@ dependencies = [
|
||||
"r2d2",
|
||||
"reqwest 0.12.4",
|
||||
"routerify",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"strum",
|
||||
|
||||
@@ -209,6 +209,7 @@ pub enum NodeSchedulingPolicy {
|
||||
Active,
|
||||
Filling,
|
||||
Pause,
|
||||
PauseForRestart,
|
||||
Draining,
|
||||
}
|
||||
|
||||
@@ -220,6 +221,7 @@ impl FromStr for NodeSchedulingPolicy {
|
||||
"active" => Ok(Self::Active),
|
||||
"filling" => Ok(Self::Filling),
|
||||
"pause" => Ok(Self::Pause),
|
||||
"pause_for_restart" => Ok(Self::PauseForRestart),
|
||||
"draining" => Ok(Self::Draining),
|
||||
_ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
|
||||
}
|
||||
@@ -233,6 +235,7 @@ impl From<NodeSchedulingPolicy> for String {
|
||||
Active => "active",
|
||||
Filling => "filling",
|
||||
Pause => "pause",
|
||||
PauseForRestart => "pause_for_restart",
|
||||
Draining => "draining",
|
||||
}
|
||||
.to_string()
|
||||
|
||||
@@ -40,6 +40,7 @@ tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
measured.workspace = true
|
||||
scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
|
||||
236
storage_controller/src/background_node_operations.rs
Normal file
236
storage_controller/src/background_node_operations.rs
Normal file
@@ -0,0 +1,236 @@
|
||||
use std::{borrow::Cow, collections::HashMap, fmt::Debug, fmt::Display, sync::Arc};
|
||||
|
||||
use tokio::{sync::mpsc::error::TrySendError, task::JoinHandle};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::service::Service;
|
||||
|
||||
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct Drain {
|
||||
node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct Fill {
|
||||
node_id: NodeId,
|
||||
}
|
||||
|
||||
pub(crate) enum Operation {
|
||||
Drain(Drain),
|
||||
Fill(Fill),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum OperationError {
|
||||
#[error("Operation precondition failed: {0}")]
|
||||
PreconditionFailed(Cow<'static, str>),
|
||||
#[error("Node state changed during operation: {0}")]
|
||||
NodeStateChanged(Cow<'static, str>),
|
||||
#[error("Operation cancelled")]
|
||||
Cancelled,
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
struct OperationHandler {
|
||||
operation: Operation,
|
||||
#[allow(unused)]
|
||||
cancel: CancellationToken,
|
||||
// TODO(vlad): maybe we don't need this handle after all.
|
||||
// Unless we want to log the error at the end.
|
||||
#[allow(unused)]
|
||||
handle: JoinHandle<Result<(), OperationError>>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct OngoingOperations(Arc<std::sync::RwLock<HashMap<NodeId, OperationHandler>>>);
|
||||
|
||||
pub(crate) struct Controller {
|
||||
ongoing: OngoingOperations,
|
||||
service: Arc<Service>,
|
||||
sender: tokio::sync::mpsc::Sender<Operation>,
|
||||
}
|
||||
|
||||
// TODO(vlad): write a little python test for this
|
||||
impl Controller {
|
||||
pub(crate) fn new(service: Arc<Service>) -> (Self, tokio::sync::mpsc::Receiver<Operation>) {
|
||||
let (operations_tx, operations_rx) = tokio::sync::mpsc::channel(1);
|
||||
(
|
||||
Self {
|
||||
ongoing: Default::default(),
|
||||
service,
|
||||
sender: operations_tx,
|
||||
},
|
||||
operations_rx,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn drain_node(&self, node_id: NodeId) -> Result<(), OperationError> {
|
||||
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
|
||||
return Err(OperationError::PreconditionFailed(
|
||||
format!(
|
||||
"Background operation already ongoing for node: {}",
|
||||
handler.operation
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
|
||||
self.sender.try_send(Operation::Drain(Drain { node_id }))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn fill_node(&self, node_id: NodeId) -> Result<(), OperationError> {
|
||||
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
|
||||
return Err(OperationError::PreconditionFailed(
|
||||
format!(
|
||||
"Background operation already ongoing for node: {}",
|
||||
handler.operation
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
|
||||
self.sender.try_send(Operation::Fill(Fill { node_id }))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_operations(
|
||||
&self,
|
||||
mut receiver: tokio::sync::mpsc::Receiver<Operation>,
|
||||
) {
|
||||
while let Some(op) = receiver.recv().await {
|
||||
match op {
|
||||
Operation::Drain(drain) => self.handle_drain(drain),
|
||||
Operation::Fill(fill) => self.handle_fill(fill),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_drain(&self, drain: Drain) {
|
||||
let node_id = drain.node_id;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let (_holder, waiter) = utils::completion::channel();
|
||||
|
||||
let handle = tokio::task::spawn({
|
||||
let service = self.service.clone();
|
||||
let ongoing = self.ongoing.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let removed = ongoing.0.write().unwrap().remove(&drain.node_id);
|
||||
if let Some(Operation::Drain(removed_drain)) = removed.map(|h| h.operation) {
|
||||
assert_eq!(removed_drain.node_id, drain.node_id, "We always remove the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
waiter.wait().await;
|
||||
service.drain_node(drain.node_id, cancel).await
|
||||
}
|
||||
});
|
||||
|
||||
let replaced = self.ongoing.0.write().unwrap().insert(
|
||||
node_id,
|
||||
OperationHandler {
|
||||
operation: Operation::Drain(drain),
|
||||
cancel,
|
||||
handle,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"The channel size is 1 and we checked before enqueing"
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_fill(&self, fill: Fill) {
|
||||
let node_id = fill.node_id;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let (_holder, waiter) = utils::completion::channel();
|
||||
|
||||
let handle = tokio::task::spawn({
|
||||
let service = self.service.clone();
|
||||
let ongoing = self.ongoing.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let removed = ongoing.0.write().unwrap().remove(&fill.node_id);
|
||||
if let Some(Operation::Fill(removed_fill)) = removed.map(|h| h.operation) {
|
||||
assert_eq!(removed_fill.node_id, fill.node_id, "We always remove the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
waiter.wait().await;
|
||||
service.fill_node(fill.node_id, cancel).await
|
||||
}
|
||||
});
|
||||
|
||||
let replaced = self.ongoing.0.write().unwrap().insert(
|
||||
node_id,
|
||||
OperationHandler {
|
||||
operation: Operation::Fill(fill),
|
||||
cancel,
|
||||
handle,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"The channel size is 1 and we checked before enqueing"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<TrySendError<T>> for OperationError {
|
||||
fn from(value: TrySendError<T>) -> Self {
|
||||
match value {
|
||||
TrySendError::Full(_) => {
|
||||
Self::PreconditionFailed("Too many background operation in progress".into())
|
||||
}
|
||||
TrySendError::Closed(_) => Self::ShuttingDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Drain {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "drain {}", self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Fill {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "fill {}", self.node_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Operation {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
Operation::Drain(op) => write!(f, "{op}"),
|
||||
Operation::Fill(op) => write!(f, "{op}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Controller {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "backround_node_operations::Controller")
|
||||
}
|
||||
}
|
||||
@@ -480,6 +480,39 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
let node_status = state.service.get_node_status(node_id).await?;
|
||||
|
||||
json_response(StatusCode::OK, node_status)
|
||||
}
|
||||
|
||||
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.start_node_drain(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.start_node_fill(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_split(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -832,6 +865,16 @@ pub fn make_router(
|
||||
RequestName("control_v1_node_config"),
|
||||
)
|
||||
})
|
||||
.get("/control/v1/node/:node_id", |r| {
|
||||
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
|
||||
})
|
||||
.put("/control/v1/node/:node_id/drain", |r| {
|
||||
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
|
||||
})
|
||||
.put("/control/v1/node/:node_id/fill", |r| {
|
||||
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
|
||||
})
|
||||
// TODO(vlad): endpoint for cancelling drain and fill
|
||||
// Tenant Shard operations
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -2,6 +2,7 @@ use serde::Serialize;
|
||||
use utils::seqwait::MonotonicCounter;
|
||||
|
||||
mod auth;
|
||||
mod background_node_operations;
|
||||
mod compute_hook;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
|
||||
@@ -59,12 +59,17 @@ impl Node {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
|
||||
self.scheduling
|
||||
}
|
||||
|
||||
pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
|
||||
self.scheduling = scheduling
|
||||
}
|
||||
|
||||
/// Does this registration request match `self`? This is used when deciding whether a registration
|
||||
/// request should be allowed to update an existing record with the same node ID.
|
||||
/// how
|
||||
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
|
||||
self.id == register_req.node_id
|
||||
&& self.listen_http_addr == register_req.listen_http_addr
|
||||
@@ -141,6 +146,7 @@ impl Node {
|
||||
NodeSchedulingPolicy::Draining => MaySchedule::No,
|
||||
NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
|
||||
NodeSchedulingPolicy::Pause => MaySchedule::No,
|
||||
NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,7 +163,7 @@ impl Node {
|
||||
listen_http_port,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
scheduling: NodeSchedulingPolicy::Filling,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
availability: NodeAvailability::Offline,
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
|
||||
@@ -442,13 +442,15 @@ impl Persistence {
|
||||
#[tracing::instrument(skip_all, fields(node_id))]
|
||||
pub(crate) async fn re_attach(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
input_node_id: NodeId,
|
||||
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
|
||||
use crate::schema::nodes::dsl::scheduling_policy;
|
||||
use crate::schema::nodes::dsl::*;
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
let updated = self
|
||||
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
|
||||
let rows_updated = diesel::update(tenant_shards)
|
||||
.filter(generation_pageserver.eq(node_id.0 as i64))
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.set(generation.eq(generation + 1))
|
||||
.execute(conn)?;
|
||||
|
||||
@@ -457,9 +459,23 @@ impl Persistence {
|
||||
// TODO: UPDATE+SELECT in one query
|
||||
|
||||
let updated = tenant_shards
|
||||
.filter(generation_pageserver.eq(node_id.0 as i64))
|
||||
.filter(generation_pageserver.eq(input_node_id.0 as i64))
|
||||
.select(TenantShardPersistence::as_select())
|
||||
.load(conn)?;
|
||||
|
||||
// If the node went through a drain and restart phase before re-attaching,
|
||||
// then reset it's node scheduling policy to active.
|
||||
diesel::update(nodes)
|
||||
.filter(node_id.eq(input_node_id.0 as i64))
|
||||
.filter(
|
||||
scheduling_policy
|
||||
// TODO(vlad): Do the same for `Filling`
|
||||
.eq(String::from(NodeSchedulingPolicy::PauseForRestart))
|
||||
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining))),
|
||||
)
|
||||
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(updated)
|
||||
})
|
||||
.await?;
|
||||
|
||||
@@ -29,6 +29,8 @@ pub enum MaySchedule {
|
||||
struct SchedulerNode {
|
||||
/// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
shard_count: usize,
|
||||
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
attached_shard_count: usize,
|
||||
|
||||
/// Whether this node is currently elegible to have new shards scheduled (this is derived
|
||||
/// from a node's availability state and scheduling policy).
|
||||
@@ -42,7 +44,9 @@ impl PartialEq for SchedulerNode {
|
||||
(MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
|
||||
);
|
||||
|
||||
may_schedule_matches && self.shard_count == other.shard_count
|
||||
may_schedule_matches
|
||||
&& self.shard_count == other.shard_count
|
||||
&& self.attached_shard_count == other.attached_shard_count
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,6 +142,15 @@ impl ScheduleContext {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum RefCountUpdate {
|
||||
PromoteSecondary,
|
||||
Attach,
|
||||
Detach,
|
||||
DemoteAttached,
|
||||
AddSecondary,
|
||||
RemoveSecondary,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
|
||||
let mut scheduler_nodes = HashMap::new();
|
||||
@@ -146,6 +159,7 @@ impl Scheduler {
|
||||
node.get_id(),
|
||||
SchedulerNode {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
},
|
||||
);
|
||||
@@ -171,6 +185,7 @@ impl Scheduler {
|
||||
node.get_id(),
|
||||
SchedulerNode {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
},
|
||||
);
|
||||
@@ -179,7 +194,10 @@ impl Scheduler {
|
||||
for shard in shards {
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
match expect_nodes.get_mut(node_id) {
|
||||
Some(node) => node.shard_count += 1,
|
||||
Some(node) => {
|
||||
node.shard_count += 1;
|
||||
node.attached_shard_count += 1;
|
||||
}
|
||||
None => anyhow::bail!(
|
||||
"Tenant {} references nonexistent node {}",
|
||||
shard.tenant_shard_id,
|
||||
@@ -227,31 +245,67 @@ impl Scheduler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Increment the reference count of a node. This reference count is used to guide scheduling
|
||||
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
|
||||
/// this node.
|
||||
/// Update the reference counts of a node. These reference counts are used to guide scheduling
|
||||
/// decisions, not for memory management: they represent the number of tenant shard whose IntentState
|
||||
/// targets this node and the number of tenants shars whose IntentState is attached to this
|
||||
/// node.
|
||||
///
|
||||
/// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
|
||||
/// [`Self::new`] or [`Self::node_upsert`])
|
||||
pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
|
||||
pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
|
||||
let Some(node) = self.nodes.get_mut(&node_id) else {
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
debug_assert!(false);
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
return;
|
||||
};
|
||||
|
||||
node.shard_count += 1;
|
||||
match update {
|
||||
RefCountUpdate::PromoteSecondary => {
|
||||
node.attached_shard_count += 1;
|
||||
}
|
||||
RefCountUpdate::Attach => {
|
||||
node.shard_count += 1;
|
||||
node.attached_shard_count += 1;
|
||||
}
|
||||
RefCountUpdate::Detach => {
|
||||
node.shard_count -= 1;
|
||||
node.attached_shard_count -= 1;
|
||||
}
|
||||
RefCountUpdate::DemoteAttached => {
|
||||
node.attached_shard_count -= 1;
|
||||
}
|
||||
RefCountUpdate::AddSecondary => {
|
||||
node.shard_count += 1;
|
||||
}
|
||||
RefCountUpdate::RemoveSecondary => {
|
||||
node.shard_count -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
|
||||
pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
|
||||
let Some(node) = self.nodes.get_mut(&node_id) else {
|
||||
// Check if the number of shards attached to a give node is lagging below
|
||||
// the cluster average. If that's the case, the node should be filled.
|
||||
// TODO(vlad): We can probably be smarter about this. This could be expressed
|
||||
// as standard deviation of attached shards while excluding paused nodes.
|
||||
pub(crate) fn should_fill_node(&self, node_id: NodeId) -> bool {
|
||||
let Some(node) = self.nodes.get(&node_id) else {
|
||||
debug_assert!(false);
|
||||
tracing::error!("Scheduler missing node {node_id}");
|
||||
return;
|
||||
return true;
|
||||
};
|
||||
|
||||
node.shard_count -= 1;
|
||||
let total_attached_shards: usize =
|
||||
self.nodes.values().map(|n| n.attached_shard_count).sum();
|
||||
|
||||
assert!(!self.nodes.is_empty());
|
||||
let expected_attached_shards_per_node = total_attached_shards / self.nodes.len();
|
||||
|
||||
// TODO(vlad): remove this
|
||||
for (node_id, node) in self.nodes.iter() {
|
||||
tracing::info!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
|
||||
}
|
||||
|
||||
node.attached_shard_count < expected_attached_shards_per_node
|
||||
}
|
||||
|
||||
pub(crate) fn node_upsert(&mut self, node: &Node) {
|
||||
@@ -263,6 +317,7 @@ impl Scheduler {
|
||||
Vacant(entry) => {
|
||||
entry.insert(SchedulerNode {
|
||||
shard_count: 0,
|
||||
attached_shard_count: 0,
|
||||
may_schedule: node.may_schedule(),
|
||||
});
|
||||
}
|
||||
|
||||
@@ -8,13 +8,15 @@ use std::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
background_node_operations::{self, Controller, OperationError, MAX_RECONCILES_PER_OPERATION},
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
scheduler::{ScheduleContext, ScheduleMode},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
@@ -275,6 +277,8 @@ pub struct Service {
|
||||
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
|
||||
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
|
||||
|
||||
background_operations_controller: std::sync::OnceLock<Controller>,
|
||||
|
||||
// Process shutdown will fire this token
|
||||
cancel: CancellationToken,
|
||||
|
||||
@@ -296,6 +300,19 @@ impl From<ReconcileWaitError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OperationError> for ApiError {
|
||||
fn from(value: OperationError) -> Self {
|
||||
match value {
|
||||
OperationError::PreconditionFailed(err) => ApiError::PreconditionFailed(err.into()),
|
||||
OperationError::NodeStateChanged(err) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(err))
|
||||
}
|
||||
OperationError::ShuttingDown => ApiError::ShuttingDown,
|
||||
OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum TenantCreateOrUpdate {
|
||||
Create(TenantCreateRequest),
|
||||
@@ -1090,12 +1107,20 @@ impl Service {
|
||||
delayed_reconcile_tx,
|
||||
abort_tx,
|
||||
startup_complete: startup_complete.clone(),
|
||||
background_operations_controller: Default::default(),
|
||||
cancel,
|
||||
gate: Gate::default(),
|
||||
tenant_op_locks: Default::default(),
|
||||
node_op_locks: Default::default(),
|
||||
});
|
||||
|
||||
let (controller, node_operations_receiver) =
|
||||
background_node_operations::Controller::new(this.clone());
|
||||
|
||||
this.background_operations_controller
|
||||
.set(controller)
|
||||
.expect("This is the only code path that sets the controller");
|
||||
|
||||
let result_task_this = this.clone();
|
||||
tokio::task::spawn(async move {
|
||||
// Block shutdown until we're done (we must respect self.cancel)
|
||||
@@ -1167,6 +1192,19 @@ impl Service {
|
||||
}
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let this = this.clone();
|
||||
let startup_complete = startup_complete.clone();
|
||||
async move {
|
||||
startup_complete.wait().await;
|
||||
this.background_operations_controller
|
||||
.get()
|
||||
.expect("Initialized at start up")
|
||||
.handle_operations(node_operations_receiver)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
@@ -1562,15 +1600,30 @@ impl Service {
|
||||
// Setting a node active unblocks any Reconcilers that might write to the location config API,
|
||||
// but those requests will not be accepted by the node until it has finished processing
|
||||
// the re-attach response.
|
||||
//
|
||||
// Additionally, reset the nodes scheduling policy to match the conditional update done
|
||||
// in [`Persistence::re_attach`].
|
||||
if let Some(node) = nodes.get(&reattach_req.node_id) {
|
||||
if !node.is_available() {
|
||||
let reset_scheduling = matches!(
|
||||
node.get_scheduling(),
|
||||
NodeSchedulingPolicy::PauseForRestart | NodeSchedulingPolicy::Draining
|
||||
);
|
||||
|
||||
if !node.is_available() || reset_scheduling {
|
||||
let mut new_nodes = (**nodes).clone();
|
||||
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
|
||||
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
|
||||
if !node.is_available() {
|
||||
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
|
||||
}
|
||||
|
||||
if reset_scheduling {
|
||||
node.set_scheduling(NodeSchedulingPolicy::Active);
|
||||
}
|
||||
|
||||
scheduler.node_upsert(node);
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
let new_nodes = Arc::new(new_nodes);
|
||||
*nodes = new_nodes;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1851,6 +1904,23 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn kick_waiters(
|
||||
&self,
|
||||
waiters: Vec<ReconcilerWaiter>,
|
||||
timeout: Duration,
|
||||
) -> Vec<ReconcilerWaiter> {
|
||||
let deadline = Instant::now().checked_add(timeout).unwrap();
|
||||
for waiter in waiters.iter() {
|
||||
let timeout = deadline.duration_since(Instant::now());
|
||||
let _ = waiter.wait_timeout(timeout).await;
|
||||
}
|
||||
|
||||
waiters
|
||||
.into_iter()
|
||||
.filter(|waiter| matches!(waiter.get_status(), ReconcilerStatus::InProgress))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Part of [`Self::tenant_location_config`]: dissect an incoming location config request,
|
||||
/// and transform it into either a tenant creation of a series of shard updates.
|
||||
///
|
||||
@@ -4128,6 +4198,18 @@ impl Service {
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_node_status(&self, node_id: NodeId) -> Result<Node, ApiError> {
|
||||
self.inner
|
||||
.read()
|
||||
.unwrap()
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.cloned()
|
||||
.ok_or(ApiError::NotFound(
|
||||
format!("Node {node_id} not registered").into(),
|
||||
))
|
||||
}
|
||||
|
||||
pub(crate) async fn node_register(
|
||||
&self,
|
||||
register_req: NodeRegisterRequest,
|
||||
@@ -4282,9 +4364,6 @@ impl Service {
|
||||
|
||||
if let Some(scheduling) = scheduling {
|
||||
node.set_scheduling(scheduling);
|
||||
|
||||
// TODO: once we have a background scheduling ticker for fill/drain, kick it
|
||||
// to wake up and start working.
|
||||
}
|
||||
|
||||
// Update the scheduler, in case the elegibility of the node for new shards has changed
|
||||
@@ -4312,7 +4391,7 @@ impl Service {
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shard.intent.demote_attached(node_id) {
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
|
||||
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
|
||||
@@ -4359,7 +4438,7 @@ impl Service {
|
||||
// TODO: in the background, we should balance work back onto this pageserver
|
||||
}
|
||||
AvailabilityTransition::Unchanged => {
|
||||
tracing::debug!("Node {} no change during config", node_id);
|
||||
tracing::debug!("Node {} no availability change during config", node_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4368,6 +4447,112 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy, schedulable_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
let schedulable_nodes_count = nodes
|
||||
.iter()
|
||||
.filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
.count();
|
||||
|
||||
(
|
||||
node.is_available(),
|
||||
node.get_scheduling(),
|
||||
schedulable_nodes_count,
|
||||
)
|
||||
};
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if schedulable_nodes_count == 0 {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
"No other schedulable nodes to drain to".into(),
|
||||
));
|
||||
}
|
||||
|
||||
match node_policy {
|
||||
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
|
||||
.await?;
|
||||
let controller = self
|
||||
.background_operations_controller
|
||||
.get()
|
||||
.expect("Initialized at start up");
|
||||
controller.drain_node(node_id)?;
|
||||
}
|
||||
NodeSchedulingPolicy::Draining => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node_id} has drain in progress"
|
||||
)));
|
||||
}
|
||||
policy => {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} cannot be drained due to {policy:?} policy").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy, total_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(node.is_available(), node.get_scheduling(), nodes.len())
|
||||
};
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if total_nodes_count <= 1 {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
"No other nodes to fill from".into(),
|
||||
));
|
||||
}
|
||||
|
||||
match node_policy {
|
||||
// TODO: clear Draining, Filling and PauseForRestart on storage controller restart and
|
||||
// re-attach
|
||||
NodeSchedulingPolicy::Active => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
|
||||
.await?;
|
||||
let controller = self
|
||||
.background_operations_controller
|
||||
.get()
|
||||
.expect("Initialized at start up");
|
||||
controller.fill_node(node_id)?;
|
||||
}
|
||||
NodeSchedulingPolicy::Filling => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node_id} has fill in progress"
|
||||
)));
|
||||
}
|
||||
policy => {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} cannot be filled due to {policy:?} policy").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
@@ -4952,4 +5137,197 @@ impl Service {
|
||||
// to complete.
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
pub(crate) async fn drain_node(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
tracing::info!(%node_id, "Starting drain background operation");
|
||||
|
||||
let last_inspected_shard: Option<TenantShardId> = None;
|
||||
let mut inspected_all_shards = false;
|
||||
let mut waiters = Vec::new();
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
while !inspected_all_shards {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
}
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
tracing::info!(%node_id, "Drain got write lock");
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut cursor =
|
||||
tenants
|
||||
.iter_mut()
|
||||
.skip_while(|(tid, _)| match last_inspected_shard {
|
||||
Some(last) => **tid != last,
|
||||
None => false,
|
||||
});
|
||||
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
let (tid, tenant_shard) = match cursor.next() {
|
||||
Some(some) => some,
|
||||
None => {
|
||||
inspected_all_shards = true;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if tenant_shard.intent.demote_attached(scheduler, node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
tracing::warn!(%tid, "Scheduling error when draining pageserver {} : {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!(%node_id, "Drain released write lock");
|
||||
|
||||
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
|
||||
}
|
||||
|
||||
let _ = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
|
||||
|
||||
// At this point we have done the best we could to drain shards from this node.
|
||||
// Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]`
|
||||
// to complete the drain.
|
||||
if let Err(err) = self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart))
|
||||
.await
|
||||
{
|
||||
// This is not fatal. Anything that is polling the node scheduling policy to detect
|
||||
// the end of the drain operations will hang, but all such places should enforce an
|
||||
// overall timeout. The scheduling policy will be updated upon node re-attach and/or
|
||||
// by the counterpart fill operation.
|
||||
tracing::warn!(%node_id, "Failed to finalise drain by setting scheduling policy: {err}");
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Completed drain background operation");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: exit early when we are balanced
|
||||
pub(crate) async fn fill_node(
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
// TODO(vlad): Currently this operates on the assumption that all
|
||||
// secondaries are warm. This is not always true (we just migrated the
|
||||
// tenant). Take that into consideration by checking the secondary status.
|
||||
|
||||
// TODO(vlad): This is a subset of the optimizations applied by `Service::optimize_all`.
|
||||
// The optimizations should be stopped while filing, othwewise we are going to fight for
|
||||
// the same reconciler semaphore units.
|
||||
|
||||
tracing::info!(%node_id, "Starting fill background operation");
|
||||
|
||||
let last_inspected_shard: Option<TenantShardId> = None;
|
||||
let mut shards_left = true;
|
||||
let mut waiters = Vec::new();
|
||||
|
||||
let mut should_fill_node = {
|
||||
self.inner
|
||||
.write()
|
||||
.unwrap()
|
||||
.scheduler
|
||||
.should_fill_node(node_id)
|
||||
};
|
||||
|
||||
while shards_left && should_fill_node {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(OperationError::Cancelled);
|
||||
}
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Filling) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut cursor =
|
||||
tenants
|
||||
.iter_mut()
|
||||
.skip_while(|(tid, _)| match last_inspected_shard {
|
||||
Some(last) => **tid != last,
|
||||
None => false,
|
||||
});
|
||||
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
let (_tid, tenant_shard) = match cursor.next() {
|
||||
Some(some) => some,
|
||||
None => {
|
||||
shards_left = false;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if tenant_shard.intent.get_secondary().contains(&node_id) {
|
||||
tenant_shard.intent.promote_attached(scheduler, node_id);
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
should_fill_node = scheduler.should_fill_node(node_id);
|
||||
}
|
||||
|
||||
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
|
||||
}
|
||||
|
||||
let _ = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
|
||||
|
||||
if let Err(err) = self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
|
||||
.await
|
||||
{
|
||||
// This isn't a huge issue since the filling process starts upon request. However, it
|
||||
// will prevent the next drain from starting. The only case in which this can fail
|
||||
// is database unavailability. Such a case will require manual intervention.
|
||||
tracing::error!(%node_id, "Failed to finalise fill by setting scheduling policy: {err}");
|
||||
}
|
||||
|
||||
tracing::info!(%node_id, "Completed fill background operation");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,9 +8,11 @@ use crate::{
|
||||
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
|
||||
persistence::TenantShardPersistence,
|
||||
reconciler::ReconcileUnits,
|
||||
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
|
||||
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
|
||||
};
|
||||
use pageserver_api::controller_api::{
|
||||
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
|
||||
use pageserver_api::{
|
||||
models::{LocationConfig, LocationConfigMode, TenantConfig},
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
@@ -153,7 +155,7 @@ impl IntentState {
|
||||
}
|
||||
pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
|
||||
if let Some(node_id) = node_id {
|
||||
scheduler.node_inc_ref(node_id);
|
||||
scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
|
||||
}
|
||||
Self {
|
||||
attached: node_id,
|
||||
@@ -164,10 +166,10 @@ impl IntentState {
|
||||
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
|
||||
if self.attached != new_attached {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_dec_ref(old_attached);
|
||||
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
|
||||
}
|
||||
if let Some(new_attached) = &new_attached {
|
||||
scheduler.node_inc_ref(*new_attached);
|
||||
scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
|
||||
}
|
||||
self.attached = new_attached;
|
||||
}
|
||||
@@ -177,22 +179,27 @@ impl IntentState {
|
||||
/// secondary to attached while maintaining the scheduler's reference counts.
|
||||
pub(crate) fn promote_attached(
|
||||
&mut self,
|
||||
_scheduler: &mut Scheduler,
|
||||
scheduler: &mut Scheduler,
|
||||
promote_secondary: NodeId,
|
||||
) {
|
||||
// If we call this with a node that isn't in secondary, it would cause incorrect
|
||||
// scheduler reference counting, since we assume the node is already referenced as a secondary.
|
||||
debug_assert!(self.secondary.contains(&promote_secondary));
|
||||
|
||||
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
|
||||
// need to call into it here.
|
||||
self.secondary.retain(|n| n != &promote_secondary);
|
||||
|
||||
let demoted = self.attached;
|
||||
self.attached = Some(promote_secondary);
|
||||
|
||||
scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
|
||||
if let Some(demoted) = demoted {
|
||||
scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
|
||||
debug_assert!(!self.secondary.contains(&new_secondary));
|
||||
scheduler.node_inc_ref(new_secondary);
|
||||
scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
|
||||
self.secondary.push(new_secondary);
|
||||
}
|
||||
|
||||
@@ -200,27 +207,27 @@ impl IntentState {
|
||||
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
|
||||
let index = self.secondary.iter().position(|n| *n == node_id);
|
||||
if let Some(index) = index {
|
||||
scheduler.node_dec_ref(node_id);
|
||||
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
|
||||
self.secondary.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
|
||||
for secondary in self.secondary.drain(..) {
|
||||
scheduler.node_dec_ref(secondary);
|
||||
scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove the last secondary node from the list of secondaries
|
||||
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
|
||||
if let Some(node_id) = self.secondary.pop() {
|
||||
scheduler.node_dec_ref(node_id);
|
||||
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
|
||||
if let Some(old_attached) = self.attached.take() {
|
||||
scheduler.node_dec_ref(old_attached);
|
||||
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
|
||||
}
|
||||
|
||||
self.clear_secondary(scheduler);
|
||||
@@ -251,12 +258,11 @@ impl IntentState {
|
||||
/// forget the location on the offline node.
|
||||
///
|
||||
/// Returns true if a change was made
|
||||
pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
|
||||
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
|
||||
if self.attached == Some(node_id) {
|
||||
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
|
||||
// need to call into it here.
|
||||
self.attached = None;
|
||||
self.secondary.push(node_id);
|
||||
scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
@@ -307,6 +313,12 @@ pub(crate) struct ReconcilerWaiter {
|
||||
seq: Sequence,
|
||||
}
|
||||
|
||||
pub(crate) enum ReconcilerStatus {
|
||||
Done,
|
||||
Failed,
|
||||
InProgress,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum ReconcileWaitError {
|
||||
#[error("Timeout waiting for shard {0}")]
|
||||
@@ -369,6 +381,16 @@ impl ReconcilerWaiter {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_status(&self) -> ReconcilerStatus {
|
||||
if self.seq_wait.would_wait_for(self.seq).is_err() {
|
||||
ReconcilerStatus::Done
|
||||
} else if self.error_seq_wait.would_wait_for(self.seq).is_err() {
|
||||
ReconcilerStatus::Failed
|
||||
} else {
|
||||
ReconcilerStatus::InProgress
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Having spawned a reconciler task, the tenant shard's state will carry enough
|
||||
@@ -593,7 +615,7 @@ impl TenantShard {
|
||||
Secondary => {
|
||||
if let Some(node_id) = self.intent.get_attached() {
|
||||
// Populate secondary by demoting the attached node
|
||||
self.intent.demote_attached(*node_id);
|
||||
self.intent.demote_attached(scheduler, *node_id);
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
@@ -648,13 +670,17 @@ impl TenantShard {
|
||||
let mut scores = all_pageservers
|
||||
.iter()
|
||||
.flat_map(|node_id| {
|
||||
if matches!(
|
||||
nodes
|
||||
.get(node_id)
|
||||
.map(|n| n.may_schedule())
|
||||
.unwrap_or(MaySchedule::No),
|
||||
MaySchedule::No
|
||||
let node = nodes.get(node_id);
|
||||
if node.is_none() {
|
||||
None
|
||||
} else if matches!(
|
||||
node.unwrap().get_scheduling(),
|
||||
NodeSchedulingPolicy::Filling
|
||||
) {
|
||||
// If the node is currently filling, don't count it as a candidate to avoid,
|
||||
// racing with the background fill.
|
||||
None
|
||||
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
|
||||
None
|
||||
} else {
|
||||
let affinity_score = schedule_context.get_node_affinity(*node_id);
|
||||
@@ -783,7 +809,7 @@ impl TenantShard {
|
||||
old_attached_node_id,
|
||||
new_attached_node_id,
|
||||
}) => {
|
||||
self.intent.demote_attached(old_attached_node_id);
|
||||
self.intent.demote_attached(scheduler, old_attached_node_id);
|
||||
self.intent
|
||||
.promote_attached(scheduler, new_attached_node_id);
|
||||
}
|
||||
@@ -1321,7 +1347,9 @@ pub(crate) mod tests {
|
||||
assert_ne!(attached_node_id, secondary_node_id);
|
||||
|
||||
// Notifying the attached node is offline should demote it to a secondary
|
||||
let changed = tenant_shard.intent.demote_attached(attached_node_id);
|
||||
let changed = tenant_shard
|
||||
.intent
|
||||
.demote_attached(&mut scheduler, attached_node_id);
|
||||
assert!(changed);
|
||||
assert!(tenant_shard.intent.attached.is_none());
|
||||
assert_eq!(tenant_shard.intent.secondary.len(), 2);
|
||||
|
||||
@@ -2213,6 +2213,30 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_drain(self, node_id):
|
||||
log.info(f"node_drain({node_id})")
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_fill(self, node_id):
|
||||
log.info(f"node_fill({node_id})")
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_status(self, node_id):
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return response.json()
|
||||
|
||||
def node_list(self):
|
||||
response = self.request(
|
||||
"GET",
|
||||
|
||||
@@ -1477,3 +1477,110 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
|
||||
workload = Workload(env, tenant_id, timeline, branch_name=branch)
|
||||
workload.expect_rows = expect_rows
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_storage_controller_drain_and_fill_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_count = 5
|
||||
shard_count_per_tenant = 8
|
||||
total_shards = tenant_count * shard_count_per_tenant
|
||||
tenant_ids = []
|
||||
|
||||
for _ in range(0, tenant_count):
|
||||
tid = TenantId.generate()
|
||||
tenant_ids.append(tid)
|
||||
env.neon_cli.create_tenant(
|
||||
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
|
||||
)
|
||||
|
||||
# Give things a chance to settle.
|
||||
# TODO(vlad): replace with a wait
|
||||
time.sleep(2)
|
||||
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
def retryable_node_operation(op, ps_id, max_attempts, backoff):
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
op(ps_id)
|
||||
return
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff):
|
||||
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
|
||||
while max_attempts > 0:
|
||||
try:
|
||||
status = env.storage_controller.node_status(node_id)
|
||||
policy = status["scheduling"]
|
||||
if policy == desired_scheduling_policy:
|
||||
return
|
||||
else:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise AssertionError(
|
||||
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
|
||||
)
|
||||
|
||||
time.sleep(backoff)
|
||||
except StorageControllerApiException as e:
|
||||
max_attempts -= 1
|
||||
log.info(f"Status call failed ({max_attempts} retries left): {e}")
|
||||
|
||||
if max_attempts == 0:
|
||||
raise e
|
||||
|
||||
time.sleep(backoff)
|
||||
|
||||
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
|
||||
# Assert that all nodes have some attached shards
|
||||
assert len(shard_counts) == len(env.pageservers)
|
||||
|
||||
min_shard_count = min(shard_counts.values())
|
||||
max_shard_count = max(shard_counts.values())
|
||||
|
||||
flake_factor = 5 / 100
|
||||
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
|
||||
|
||||
# Perform a graceful rolling restart
|
||||
for ps in env.pageservers:
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
|
||||
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
|
||||
# Assert that we've drained the node
|
||||
assert shard_counts[str(ps.id)] == 0
|
||||
# Assert that those shards actually went somewhere
|
||||
assert sum(shard_counts.values()) == total_shards
|
||||
|
||||
ps.restart()
|
||||
poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
|
||||
|
||||
retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
|
||||
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
|
||||
assert_shard_counts_balanced(env, shard_counts, total_shards)
|
||||
|
||||
# Now check that shards are reasonably balanced
|
||||
shard_counts = get_node_shard_counts(env, tenant_ids)
|
||||
log.info(f"Shard counts after rolling restart: {shard_counts}")
|
||||
assert_shard_counts_balanced(env, shard_counts, total_shards)
|
||||
|
||||
Reference in New Issue
Block a user