Compare commits

...

1 Commits

Author SHA1 Message Date
Vlad Lazar
053235cf57 wip
wip

clippy

wip

test

wip
2024-06-11 12:09:37 +01:00
13 changed files with 952 additions and 53 deletions

1
Cargo.lock generated
View File

@@ -5801,6 +5801,7 @@ dependencies = [
"r2d2",
"reqwest 0.12.4",
"routerify",
"scopeguard",
"serde",
"serde_json",
"strum",

View File

@@ -209,6 +209,7 @@ pub enum NodeSchedulingPolicy {
Active,
Filling,
Pause,
PauseForRestart,
Draining,
}
@@ -220,6 +221,7 @@ impl FromStr for NodeSchedulingPolicy {
"active" => Ok(Self::Active),
"filling" => Ok(Self::Filling),
"pause" => Ok(Self::Pause),
"pause_for_restart" => Ok(Self::PauseForRestart),
"draining" => Ok(Self::Draining),
_ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")),
}
@@ -233,6 +235,7 @@ impl From<NodeSchedulingPolicy> for String {
Active => "active",
Filling => "filling",
Pause => "pause",
PauseForRestart => "pause_for_restart",
Draining => "draining",
}
.to_string()

View File

@@ -40,6 +40,7 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
measured.workspace = true
scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -0,0 +1,236 @@
use std::{borrow::Cow, collections::HashMap, fmt::Debug, fmt::Display, sync::Arc};
use tokio::{sync::mpsc::error::TrySendError, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use utils::id::NodeId;
use crate::service::Service;
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10;
#[derive(Copy, Clone)]
pub(crate) struct Drain {
node_id: NodeId,
}
#[derive(Copy, Clone)]
pub(crate) struct Fill {
node_id: NodeId,
}
pub(crate) enum Operation {
Drain(Drain),
Fill(Fill),
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum OperationError {
#[error("Operation precondition failed: {0}")]
PreconditionFailed(Cow<'static, str>),
#[error("Node state changed during operation: {0}")]
NodeStateChanged(Cow<'static, str>),
#[error("Operation cancelled")]
Cancelled,
#[error("Shutting down")]
ShuttingDown,
}
struct OperationHandler {
operation: Operation,
#[allow(unused)]
cancel: CancellationToken,
// TODO(vlad): maybe we don't need this handle after all.
// Unless we want to log the error at the end.
#[allow(unused)]
handle: JoinHandle<Result<(), OperationError>>,
}
#[derive(Default, Clone)]
struct OngoingOperations(Arc<std::sync::RwLock<HashMap<NodeId, OperationHandler>>>);
pub(crate) struct Controller {
ongoing: OngoingOperations,
service: Arc<Service>,
sender: tokio::sync::mpsc::Sender<Operation>,
}
// TODO(vlad): write a little python test for this
impl Controller {
pub(crate) fn new(service: Arc<Service>) -> (Self, tokio::sync::mpsc::Receiver<Operation>) {
let (operations_tx, operations_rx) = tokio::sync::mpsc::channel(1);
(
Self {
ongoing: Default::default(),
service,
sender: operations_tx,
},
operations_rx,
)
}
pub(crate) fn drain_node(&self, node_id: NodeId) -> Result<(), OperationError> {
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
return Err(OperationError::PreconditionFailed(
format!(
"Background operation already ongoing for node: {}",
handler.operation
)
.into(),
));
}
self.sender.try_send(Operation::Drain(Drain { node_id }))?;
Ok(())
}
pub(crate) fn fill_node(&self, node_id: NodeId) -> Result<(), OperationError> {
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
return Err(OperationError::PreconditionFailed(
format!(
"Background operation already ongoing for node: {}",
handler.operation
)
.into(),
));
}
self.sender.try_send(Operation::Fill(Fill { node_id }))?;
Ok(())
}
pub(crate) async fn handle_operations(
&self,
mut receiver: tokio::sync::mpsc::Receiver<Operation>,
) {
while let Some(op) = receiver.recv().await {
match op {
Operation::Drain(drain) => self.handle_drain(drain),
Operation::Fill(fill) => self.handle_fill(fill),
}
}
}
fn handle_drain(&self, drain: Drain) {
let node_id = drain.node_id;
let cancel = CancellationToken::new();
let (_holder, waiter) = utils::completion::channel();
let handle = tokio::task::spawn({
let service = self.service.clone();
let ongoing = self.ongoing.clone();
let cancel = cancel.clone();
async move {
scopeguard::defer! {
let removed = ongoing.0.write().unwrap().remove(&drain.node_id);
if let Some(Operation::Drain(removed_drain)) = removed.map(|h| h.operation) {
assert_eq!(removed_drain.node_id, drain.node_id, "We always remove the same operation");
} else {
panic!("We always remove the same operation")
}
}
waiter.wait().await;
service.drain_node(drain.node_id, cancel).await
}
});
let replaced = self.ongoing.0.write().unwrap().insert(
node_id,
OperationHandler {
operation: Operation::Drain(drain),
cancel,
handle,
},
);
assert!(
replaced.is_none(),
"The channel size is 1 and we checked before enqueing"
);
}
fn handle_fill(&self, fill: Fill) {
let node_id = fill.node_id;
let cancel = CancellationToken::new();
let (_holder, waiter) = utils::completion::channel();
let handle = tokio::task::spawn({
let service = self.service.clone();
let ongoing = self.ongoing.clone();
let cancel = cancel.clone();
async move {
scopeguard::defer! {
let removed = ongoing.0.write().unwrap().remove(&fill.node_id);
if let Some(Operation::Fill(removed_fill)) = removed.map(|h| h.operation) {
assert_eq!(removed_fill.node_id, fill.node_id, "We always remove the same operation");
} else {
panic!("We always remove the same operation")
}
}
waiter.wait().await;
service.fill_node(fill.node_id, cancel).await
}
});
let replaced = self.ongoing.0.write().unwrap().insert(
node_id,
OperationHandler {
operation: Operation::Fill(fill),
cancel,
handle,
},
);
assert!(
replaced.is_none(),
"The channel size is 1 and we checked before enqueing"
);
}
}
impl<T> From<TrySendError<T>> for OperationError {
fn from(value: TrySendError<T>) -> Self {
match value {
TrySendError::Full(_) => {
Self::PreconditionFailed("Too many background operation in progress".into())
}
TrySendError::Closed(_) => Self::ShuttingDown,
}
}
}
impl Display for Drain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "drain {}", self.node_id)
}
}
impl Display for Fill {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "fill {}", self.node_id)
}
}
impl Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Operation::Drain(op) => write!(f, "{op}"),
Operation::Fill(op) => write!(f, "{op}"),
}
}
}
impl Debug for Controller {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "backround_node_operations::Controller")
}
}

View File

@@ -480,6 +480,39 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
)
}
async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let node_status = state.service.get_node_status(node_id).await?;
json_response(StatusCode::OK, node_status)
}
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
state.service.start_node_drain(node_id).await?;
json_response(StatusCode::ACCEPTED, ())
}
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
state.service.start_node_fill(node_id).await?;
json_response(StatusCode::ACCEPTED, ())
}
async fn handle_tenant_shard_split(
service: Arc<Service>,
mut req: Request<Body>,
@@ -832,6 +865,16 @@ pub fn make_router(
RequestName("control_v1_node_config"),
)
})
.get("/control/v1/node/:node_id", |r| {
named_request_span(r, handle_node_status, RequestName("control_v1_node_status"))
})
.put("/control/v1/node/:node_id/drain", |r| {
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
})
.put("/control/v1/node/:node_id/fill", |r| {
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
})
// TODO(vlad): endpoint for cancelling drain and fill
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
tenant_service_handler(

View File

@@ -2,6 +2,7 @@ use serde::Serialize;
use utils::seqwait::MonotonicCounter;
mod auth;
mod background_node_operations;
mod compute_hook;
mod heartbeater;
pub mod http;

View File

@@ -59,12 +59,17 @@ impl Node {
self.id
}
pub(crate) fn get_scheduling(&self) -> NodeSchedulingPolicy {
self.scheduling
}
pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
self.scheduling = scheduling
}
/// Does this registration request match `self`? This is used when deciding whether a registration
/// request should be allowed to update an existing record with the same node ID.
/// how
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
@@ -141,6 +146,7 @@ impl Node {
NodeSchedulingPolicy::Draining => MaySchedule::No,
NodeSchedulingPolicy::Filling => MaySchedule::Yes(score),
NodeSchedulingPolicy::Pause => MaySchedule::No,
NodeSchedulingPolicy::PauseForRestart => MaySchedule::No,
}
}
@@ -157,7 +163,7 @@ impl Node {
listen_http_port,
listen_pg_addr,
listen_pg_port,
scheduling: NodeSchedulingPolicy::Filling,
scheduling: NodeSchedulingPolicy::Active,
availability: NodeAvailability::Offline,
cancel: CancellationToken::new(),
}

View File

@@ -442,13 +442,15 @@ impl Persistence {
#[tracing::instrument(skip_all, fields(node_id))]
pub(crate) async fn re_attach(
&self,
node_id: NodeId,
input_node_id: NodeId,
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
use crate::schema::nodes::dsl::scheduling_policy;
use crate::schema::nodes::dsl::*;
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
let rows_updated = diesel::update(tenant_shards)
.filter(generation_pageserver.eq(node_id.0 as i64))
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.set(generation.eq(generation + 1))
.execute(conn)?;
@@ -457,9 +459,23 @@ impl Persistence {
// TODO: UPDATE+SELECT in one query
let updated = tenant_shards
.filter(generation_pageserver.eq(node_id.0 as i64))
.filter(generation_pageserver.eq(input_node_id.0 as i64))
.select(TenantShardPersistence::as_select())
.load(conn)?;
// If the node went through a drain and restart phase before re-attaching,
// then reset it's node scheduling policy to active.
diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.filter(
scheduling_policy
// TODO(vlad): Do the same for `Filling`
.eq(String::from(NodeSchedulingPolicy::PauseForRestart))
.or(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Draining))),
)
.set(scheduling_policy.eq(String::from(NodeSchedulingPolicy::Active)))
.execute(conn)?;
Ok(updated)
})
.await?;

View File

@@ -29,6 +29,8 @@ pub enum MaySchedule {
struct SchedulerNode {
/// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
shard_count: usize,
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
attached_shard_count: usize,
/// Whether this node is currently elegible to have new shards scheduled (this is derived
/// from a node's availability state and scheduling policy).
@@ -42,7 +44,9 @@ impl PartialEq for SchedulerNode {
(MaySchedule::Yes(_), MaySchedule::Yes(_)) | (MaySchedule::No, MaySchedule::No)
);
may_schedule_matches && self.shard_count == other.shard_count
may_schedule_matches
&& self.shard_count == other.shard_count
&& self.attached_shard_count == other.attached_shard_count
}
}
@@ -138,6 +142,15 @@ impl ScheduleContext {
}
}
pub(crate) enum RefCountUpdate {
PromoteSecondary,
Attach,
Detach,
DemoteAttached,
AddSecondary,
RemoveSecondary,
}
impl Scheduler {
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
let mut scheduler_nodes = HashMap::new();
@@ -146,6 +159,7 @@ impl Scheduler {
node.get_id(),
SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
},
);
@@ -171,6 +185,7 @@ impl Scheduler {
node.get_id(),
SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
},
);
@@ -179,7 +194,10 @@ impl Scheduler {
for shard in shards {
if let Some(node_id) = shard.intent.get_attached() {
match expect_nodes.get_mut(node_id) {
Some(node) => node.shard_count += 1,
Some(node) => {
node.shard_count += 1;
node.attached_shard_count += 1;
}
None => anyhow::bail!(
"Tenant {} references nonexistent node {}",
shard.tenant_shard_id,
@@ -227,31 +245,67 @@ impl Scheduler {
Ok(())
}
/// Increment the reference count of a node. This reference count is used to guide scheduling
/// decisions, not for memory management: it represents one tenant shard whose IntentState targets
/// this node.
/// Update the reference counts of a node. These reference counts are used to guide scheduling
/// decisions, not for memory management: they represent the number of tenant shard whose IntentState
/// targets this node and the number of tenants shars whose IntentState is attached to this
/// node.
///
/// It is an error to call this for a node that is not known to the scheduler (i.e. passed into
/// [`Self::new`] or [`Self::node_upsert`])
pub(crate) fn node_inc_ref(&mut self, node_id: NodeId) {
pub(crate) fn update_node_ref_counts(&mut self, node_id: NodeId, update: RefCountUpdate) {
let Some(node) = self.nodes.get_mut(&node_id) else {
tracing::error!("Scheduler missing node {node_id}");
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return;
};
node.shard_count += 1;
match update {
RefCountUpdate::PromoteSecondary => {
node.attached_shard_count += 1;
}
RefCountUpdate::Attach => {
node.shard_count += 1;
node.attached_shard_count += 1;
}
RefCountUpdate::Detach => {
node.shard_count -= 1;
node.attached_shard_count -= 1;
}
RefCountUpdate::DemoteAttached => {
node.attached_shard_count -= 1;
}
RefCountUpdate::AddSecondary => {
node.shard_count += 1;
}
RefCountUpdate::RemoveSecondary => {
node.shard_count -= 1;
}
}
}
/// Decrement a node's reference count. Inverse of [`Self::node_inc_ref`].
pub(crate) fn node_dec_ref(&mut self, node_id: NodeId) {
let Some(node) = self.nodes.get_mut(&node_id) else {
// Check if the number of shards attached to a give node is lagging below
// the cluster average. If that's the case, the node should be filled.
// TODO(vlad): We can probably be smarter about this. This could be expressed
// as standard deviation of attached shards while excluding paused nodes.
pub(crate) fn should_fill_node(&self, node_id: NodeId) -> bool {
let Some(node) = self.nodes.get(&node_id) else {
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return;
return true;
};
node.shard_count -= 1;
let total_attached_shards: usize =
self.nodes.values().map(|n| n.attached_shard_count).sum();
assert!(!self.nodes.is_empty());
let expected_attached_shards_per_node = total_attached_shards / self.nodes.len();
// TODO(vlad): remove this
for (node_id, node) in self.nodes.iter() {
tracing::info!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
}
node.attached_shard_count < expected_attached_shards_per_node
}
pub(crate) fn node_upsert(&mut self, node: &Node) {
@@ -263,6 +317,7 @@ impl Scheduler {
Vacant(entry) => {
entry.insert(SchedulerNode {
shard_count: 0,
attached_shard_count: 0,
may_schedule: node.may_schedule(),
});
}

View File

@@ -8,13 +8,15 @@ use std::{
};
use crate::{
background_node_operations::{self, Controller, OperationError, MAX_RECONCILES_PER_OPERATION},
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
ScheduleOptimizationAction,
},
};
use anyhow::Context;
@@ -275,6 +277,8 @@ pub struct Service {
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
background_operations_controller: std::sync::OnceLock<Controller>,
// Process shutdown will fire this token
cancel: CancellationToken,
@@ -296,6 +300,19 @@ impl From<ReconcileWaitError> for ApiError {
}
}
impl From<OperationError> for ApiError {
fn from(value: OperationError) -> Self {
match value {
OperationError::PreconditionFailed(err) => ApiError::PreconditionFailed(err.into()),
OperationError::NodeStateChanged(err) => {
ApiError::InternalServerError(anyhow::anyhow!(err))
}
OperationError::ShuttingDown => ApiError::ShuttingDown,
OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
}
}
}
#[allow(clippy::large_enum_variant)]
enum TenantCreateOrUpdate {
Create(TenantCreateRequest),
@@ -1090,12 +1107,20 @@ impl Service {
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
background_operations_controller: Default::default(),
cancel,
gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
});
let (controller, node_operations_receiver) =
background_node_operations::Controller::new(this.clone());
this.background_operations_controller
.set(controller)
.expect("This is the only code path that sets the controller");
let result_task_this = this.clone();
tokio::task::spawn(async move {
// Block shutdown until we're done (we must respect self.cancel)
@@ -1167,6 +1192,19 @@ impl Service {
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();
async move {
startup_complete.wait().await;
this.background_operations_controller
.get()
.expect("Initialized at start up")
.handle_operations(node_operations_receiver)
.await;
}
});
Ok(this)
}
@@ -1562,15 +1600,30 @@ impl Service {
// Setting a node active unblocks any Reconcilers that might write to the location config API,
// but those requests will not be accepted by the node until it has finished processing
// the re-attach response.
//
// Additionally, reset the nodes scheduling policy to match the conditional update done
// in [`Persistence::re_attach`].
if let Some(node) = nodes.get(&reattach_req.node_id) {
if !node.is_available() {
let reset_scheduling = matches!(
node.get_scheduling(),
NodeSchedulingPolicy::PauseForRestart | NodeSchedulingPolicy::Draining
);
if !node.is_available() || reset_scheduling {
let mut new_nodes = (**nodes).clone();
if let Some(node) = new_nodes.get_mut(&reattach_req.node_id) {
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
if !node.is_available() {
node.set_availability(NodeAvailability::Active(UtilizationScore::worst()));
}
if reset_scheduling {
node.set_scheduling(NodeSchedulingPolicy::Active);
}
scheduler.node_upsert(node);
let new_nodes = Arc::new(new_nodes);
*nodes = new_nodes;
}
let new_nodes = Arc::new(new_nodes);
*nodes = new_nodes;
}
}
@@ -1851,6 +1904,23 @@ impl Service {
Ok(())
}
async fn kick_waiters(
&self,
waiters: Vec<ReconcilerWaiter>,
timeout: Duration,
) -> Vec<ReconcilerWaiter> {
let deadline = Instant::now().checked_add(timeout).unwrap();
for waiter in waiters.iter() {
let timeout = deadline.duration_since(Instant::now());
let _ = waiter.wait_timeout(timeout).await;
}
waiters
.into_iter()
.filter(|waiter| matches!(waiter.get_status(), ReconcilerStatus::InProgress))
.collect::<Vec<_>>()
}
/// Part of [`Self::tenant_location_config`]: dissect an incoming location config request,
/// and transform it into either a tenant creation of a series of shard updates.
///
@@ -4128,6 +4198,18 @@ impl Service {
Ok(nodes)
}
pub(crate) async fn get_node_status(&self, node_id: NodeId) -> Result<Node, ApiError> {
self.inner
.read()
.unwrap()
.nodes
.get(&node_id)
.cloned()
.ok_or(ApiError::NotFound(
format!("Node {node_id} not registered").into(),
))
}
pub(crate) async fn node_register(
&self,
register_req: NodeRegisterRequest,
@@ -4282,9 +4364,6 @@ impl Service {
if let Some(scheduling) = scheduling {
node.set_scheduling(scheduling);
// TODO: once we have a background scheduling ticker for fill/drain, kick it
// to wake up and start working.
}
// Update the scheduler, in case the elegibility of the node for new shards has changed
@@ -4312,7 +4391,7 @@ impl Service {
continue;
}
if tenant_shard.intent.demote_attached(node_id) {
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
@@ -4359,7 +4438,7 @@ impl Service {
// TODO: in the background, we should balance work back onto this pageserver
}
AvailabilityTransition::Unchanged => {
tracing::debug!("Node {} no change during config", node_id);
tracing::debug!("Node {} no availability change during config", node_id);
}
}
@@ -4368,6 +4447,112 @@ impl Service {
Ok(())
}
pub(crate) async fn start_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
let (node_available, node_policy, schedulable_nodes_count) = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
let schedulable_nodes_count = nodes
.iter()
.filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_)))
.count();
(
node.is_available(),
node.get_scheduling(),
schedulable_nodes_count,
)
};
if !node_available {
return Err(ApiError::ResourceUnavailable(
format!("Node {node_id} is currently unavailable").into(),
));
}
if schedulable_nodes_count == 0 {
return Err(ApiError::PreconditionFailed(
"No other schedulable nodes to drain to".into(),
));
}
match node_policy {
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
.await?;
let controller = self
.background_operations_controller
.get()
.expect("Initialized at start up");
controller.drain_node(node_id)?;
}
NodeSchedulingPolicy::Draining => {
return Err(ApiError::Conflict(format!(
"Node {node_id} has drain in progress"
)));
}
policy => {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} cannot be drained due to {policy:?} policy").into(),
));
}
}
Ok(())
}
pub(crate) async fn start_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
let (node_available, node_policy, total_nodes_count) = {
let locked = self.inner.read().unwrap();
let nodes = &locked.nodes;
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
anyhow::anyhow!("Node {} not registered", node_id).into(),
))?;
(node.is_available(), node.get_scheduling(), nodes.len())
};
if !node_available {
return Err(ApiError::ResourceUnavailable(
format!("Node {node_id} is currently unavailable").into(),
));
}
if total_nodes_count <= 1 {
return Err(ApiError::PreconditionFailed(
"No other nodes to fill from".into(),
));
}
match node_policy {
// TODO: clear Draining, Filling and PauseForRestart on storage controller restart and
// re-attach
NodeSchedulingPolicy::Active => {
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
.await?;
let controller = self
.background_operations_controller
.get()
.expect("Initialized at start up");
controller.fill_node(node_id)?;
}
NodeSchedulingPolicy::Filling => {
return Err(ApiError::Conflict(format!(
"Node {node_id} has fill in progress"
)));
}
policy => {
return Err(ApiError::PreconditionFailed(
format!("Node {node_id} cannot be filled due to {policy:?} policy").into(),
));
}
}
Ok(())
}
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.
@@ -4952,4 +5137,197 @@ impl Service {
// to complete.
self.gate.close().await;
}
pub(crate) async fn drain_node(
&self,
node_id: NodeId,
cancel: CancellationToken,
) -> Result<(), OperationError> {
tracing::info!(%node_id, "Starting drain background operation");
let last_inspected_shard: Option<TenantShardId> = None;
let mut inspected_all_shards = false;
let mut waiters = Vec::new();
let mut schedule_context = ScheduleContext::default();
while !inspected_all_shards {
if cancel.is_cancelled() {
return Err(OperationError::Cancelled);
}
{
let mut locked = self.inner.write().unwrap();
tracing::info!(%node_id, "Drain got write lock");
let (nodes, tenants, scheduler) = locked.parts_mut();
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
format!("node {node_id} was removed").into(),
))?;
let current_policy = node.get_scheduling();
if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
// about it
return Err(OperationError::NodeStateChanged(
format!("node {node_id} changed state to {current_policy:?}").into(),
));
}
let mut cursor =
tenants
.iter_mut()
.skip_while(|(tid, _)| match last_inspected_shard {
Some(last) => **tid != last,
None => false,
});
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
let (tid, tenant_shard) = match cursor.next() {
Some(some) => some,
None => {
inspected_all_shards = true;
break;
}
};
if tenant_shard.intent.demote_attached(scheduler, node_id) {
tenant_shard.sequence = tenant_shard.sequence.next();
match tenant_shard.schedule(scheduler, &mut schedule_context) {
Err(e) => {
tracing::warn!(%tid, "Scheduling error when draining pageserver {} : {e}", node_id);
}
Ok(()) => {
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
if let Some(some) = waiter {
waiters.push(some);
}
}
}
}
}
}
tracing::info!(%node_id, "Drain released write lock");
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
}
let _ = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
// At this point we have done the best we could to drain shards from this node.
// Set the node scheduling policy to `[NodeSchedulingPolicy::PauseForRestart]`
// to complete the drain.
if let Err(err) = self
.node_configure(node_id, None, Some(NodeSchedulingPolicy::PauseForRestart))
.await
{
// This is not fatal. Anything that is polling the node scheduling policy to detect
// the end of the drain operations will hang, but all such places should enforce an
// overall timeout. The scheduling policy will be updated upon node re-attach and/or
// by the counterpart fill operation.
tracing::warn!(%node_id, "Failed to finalise drain by setting scheduling policy: {err}");
}
tracing::info!(%node_id, "Completed drain background operation");
Ok(())
}
// TODO: exit early when we are balanced
pub(crate) async fn fill_node(
&self,
node_id: NodeId,
cancel: CancellationToken,
) -> Result<(), OperationError> {
// TODO(vlad): Currently this operates on the assumption that all
// secondaries are warm. This is not always true (we just migrated the
// tenant). Take that into consideration by checking the secondary status.
// TODO(vlad): This is a subset of the optimizations applied by `Service::optimize_all`.
// The optimizations should be stopped while filing, othwewise we are going to fight for
// the same reconciler semaphore units.
tracing::info!(%node_id, "Starting fill background operation");
let last_inspected_shard: Option<TenantShardId> = None;
let mut shards_left = true;
let mut waiters = Vec::new();
let mut should_fill_node = {
self.inner
.write()
.unwrap()
.scheduler
.should_fill_node(node_id)
};
while shards_left && should_fill_node {
if cancel.is_cancelled() {
return Err(OperationError::Cancelled);
}
{
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
format!("node {node_id} was removed").into(),
))?;
let current_policy = node.get_scheduling();
if !matches!(current_policy, NodeSchedulingPolicy::Filling) {
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
// about it
return Err(OperationError::NodeStateChanged(
format!("node {node_id} changed state to {current_policy:?}").into(),
));
}
let mut cursor =
tenants
.iter_mut()
.skip_while(|(tid, _)| match last_inspected_shard {
Some(last) => **tid != last,
None => false,
});
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
let (_tid, tenant_shard) = match cursor.next() {
Some(some) => some,
None => {
shards_left = false;
break;
}
};
if tenant_shard.intent.get_secondary().contains(&node_id) {
tenant_shard.intent.promote_attached(scheduler, node_id);
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
if let Some(some) = waiter {
waiters.push(some);
}
}
}
should_fill_node = scheduler.should_fill_node(node_id);
}
waiters = self.kick_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
}
let _ = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await;
if let Err(err) = self
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
.await
{
// This isn't a huge issue since the filling process starts upon request. However, it
// will prevent the next drain from starting. The only case in which this can fail
// is database unavailability. Such a case will require manual intervention.
tracing::error!(%node_id, "Failed to finalise fill by setting scheduling policy: {err}");
}
tracing::info!(%node_id, "Completed fill background operation");
Ok(())
}
}

View File

@@ -8,9 +8,11 @@ use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
reconciler::ReconcileUnits,
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
};
use pageserver_api::controller_api::{
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy,
};
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
@@ -153,7 +155,7 @@ impl IntentState {
}
pub(crate) fn single(scheduler: &mut Scheduler, node_id: Option<NodeId>) -> Self {
if let Some(node_id) = node_id {
scheduler.node_inc_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::Attach);
}
Self {
attached: node_id,
@@ -164,10 +166,10 @@ impl IntentState {
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
if self.attached != new_attached {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
}
if let Some(new_attached) = &new_attached {
scheduler.node_inc_ref(*new_attached);
scheduler.update_node_ref_counts(*new_attached, RefCountUpdate::Attach);
}
self.attached = new_attached;
}
@@ -177,22 +179,27 @@ impl IntentState {
/// secondary to attached while maintaining the scheduler's reference counts.
pub(crate) fn promote_attached(
&mut self,
_scheduler: &mut Scheduler,
scheduler: &mut Scheduler,
promote_secondary: NodeId,
) {
// If we call this with a node that isn't in secondary, it would cause incorrect
// scheduler reference counting, since we assume the node is already referenced as a secondary.
debug_assert!(self.secondary.contains(&promote_secondary));
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.secondary.retain(|n| n != &promote_secondary);
let demoted = self.attached;
self.attached = Some(promote_secondary);
scheduler.update_node_ref_counts(promote_secondary, RefCountUpdate::PromoteSecondary);
if let Some(demoted) = demoted {
scheduler.update_node_ref_counts(demoted, RefCountUpdate::DemoteAttached);
}
}
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_inc_ref(new_secondary);
scheduler.update_node_ref_counts(new_secondary, RefCountUpdate::AddSecondary);
self.secondary.push(new_secondary);
}
@@ -200,27 +207,27 @@ impl IntentState {
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
let index = self.secondary.iter().position(|n| *n == node_id);
if let Some(index) = index {
scheduler.node_dec_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
self.secondary.remove(index);
}
}
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
for secondary in self.secondary.drain(..) {
scheduler.node_dec_ref(secondary);
scheduler.update_node_ref_counts(secondary, RefCountUpdate::RemoveSecondary);
}
}
/// Remove the last secondary node from the list of secondaries
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
if let Some(node_id) = self.secondary.pop() {
scheduler.node_dec_ref(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::RemoveSecondary);
}
}
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
scheduler.update_node_ref_counts(old_attached, RefCountUpdate::Detach);
}
self.clear_secondary(scheduler);
@@ -251,12 +258,11 @@ impl IntentState {
/// forget the location on the offline node.
///
/// Returns true if a change was made
pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
self.attached = None;
self.secondary.push(node_id);
scheduler.update_node_ref_counts(node_id, RefCountUpdate::DemoteAttached);
true
} else {
false
@@ -307,6 +313,12 @@ pub(crate) struct ReconcilerWaiter {
seq: Sequence,
}
pub(crate) enum ReconcilerStatus {
Done,
Failed,
InProgress,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum ReconcileWaitError {
#[error("Timeout waiting for shard {0}")]
@@ -369,6 +381,16 @@ impl ReconcilerWaiter {
Ok(())
}
pub(crate) fn get_status(&self) -> ReconcilerStatus {
if self.seq_wait.would_wait_for(self.seq).is_err() {
ReconcilerStatus::Done
} else if self.error_seq_wait.would_wait_for(self.seq).is_err() {
ReconcilerStatus::Failed
} else {
ReconcilerStatus::InProgress
}
}
}
/// Having spawned a reconciler task, the tenant shard's state will carry enough
@@ -593,7 +615,7 @@ impl TenantShard {
Secondary => {
if let Some(node_id) = self.intent.get_attached() {
// Populate secondary by demoting the attached node
self.intent.demote_attached(*node_id);
self.intent.demote_attached(scheduler, *node_id);
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
@@ -648,13 +670,17 @@ impl TenantShard {
let mut scores = all_pageservers
.iter()
.flat_map(|node_id| {
if matches!(
nodes
.get(node_id)
.map(|n| n.may_schedule())
.unwrap_or(MaySchedule::No),
MaySchedule::No
let node = nodes.get(node_id);
if node.is_none() {
None
} else if matches!(
node.unwrap().get_scheduling(),
NodeSchedulingPolicy::Filling
) {
// If the node is currently filling, don't count it as a candidate to avoid,
// racing with the background fill.
None
} else if matches!(node.unwrap().may_schedule(), MaySchedule::No) {
None
} else {
let affinity_score = schedule_context.get_node_affinity(*node_id);
@@ -783,7 +809,7 @@ impl TenantShard {
old_attached_node_id,
new_attached_node_id,
}) => {
self.intent.demote_attached(old_attached_node_id);
self.intent.demote_attached(scheduler, old_attached_node_id);
self.intent
.promote_attached(scheduler, new_attached_node_id);
}
@@ -1321,7 +1347,9 @@ pub(crate) mod tests {
assert_ne!(attached_node_id, secondary_node_id);
// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_shard.intent.demote_attached(attached_node_id);
let changed = tenant_shard
.intent
.demote_attached(&mut scheduler, attached_node_id);
assert!(changed);
assert!(tenant_shard.intent.attached.is_none());
assert_eq!(tenant_shard.intent.secondary.len(), 2);

View File

@@ -2213,6 +2213,30 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
def node_drain(self, node_id):
log.info(f"node_drain({node_id})")
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain",
headers=self.headers(TokenScope.ADMIN),
)
def node_fill(self, node_id):
log.info(f"node_fill({node_id})")
self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill",
headers=self.headers(TokenScope.ADMIN),
)
def node_status(self, node_id):
response = self.request(
"GET",
f"{self.env.storage_controller_api}/control/v1/node/{node_id}",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def node_list(self):
response = self.request(
"GET",

View File

@@ -1477,3 +1477,110 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto
workload = Workload(env, tenant_id, timeline, branch_name=branch)
workload.expect_rows = expect_rows
workload.validate()
def test_storage_controller_drain_and_fill_smoke(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_count = 5
shard_count_per_tenant = 8
total_shards = tenant_count * shard_count_per_tenant
tenant_ids = []
for _ in range(0, tenant_count):
tid = TenantId.generate()
tenant_ids.append(tid)
env.neon_cli.create_tenant(
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
)
# Give things a chance to settle.
# TODO(vlad): replace with a wait
time.sleep(2)
nodes = env.storage_controller.node_list()
assert len(nodes) == 2
def retryable_node_operation(op, ps_id, max_attempts, backoff):
while max_attempts > 0:
try:
op(ps_id)
return
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Operation failed ({max_attempts} attempts left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def poll_node_status(node_id, desired_scheduling_policy, max_attempts, backoff):
log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy")
while max_attempts > 0:
try:
status = env.storage_controller.node_status(node_id)
policy = status["scheduling"]
if policy == desired_scheduling_policy:
return
else:
max_attempts -= 1
log.info(f"Status call returned {policy=} ({max_attempts} attempts left)")
if max_attempts == 0:
raise AssertionError(
f"Status for {node_id=} did not reach {desired_scheduling_policy=}"
)
time.sleep(backoff)
except StorageControllerApiException as e:
max_attempts -= 1
log.info(f"Status call failed ({max_attempts} retries left): {e}")
if max_attempts == 0:
raise e
time.sleep(backoff)
def assert_shard_counts_balanced(env: NeonEnv, shard_counts, total_shards):
# Assert that all nodes have some attached shards
assert len(shard_counts) == len(env.pageservers)
min_shard_count = min(shard_counts.values())
max_shard_count = max(shard_counts.values())
flake_factor = 5 / 100
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
# Perform a graceful rolling restart
for ps in env.pageservers:
retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
# Assert that we've drained the node
assert shard_counts[str(ps.id)] == 0
# Assert that those shards actually went somewhere
assert sum(shard_counts.values()) == total_shards
ps.restart()
poll_node_status(ps.id, "Active", max_attempts=10, backoff=1)
retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
poll_node_status(ps.id, "Active", max_attempts=6, backoff=5)
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_shard_counts_balanced(env, shard_counts, total_shards)
# Now check that shards are reasonably balanced
shard_counts = get_node_shard_counts(env, tenant_ids)
log.info(f"Shard counts after rolling restart: {shard_counts}")
assert_shard_counts_balanced(env, shard_counts, total_shards)