mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 05:20:38 +00:00
review: remove Controller entity
This commit is contained in:
@@ -1,23 +1,21 @@
|
||||
use std::{borrow::Cow, collections::HashMap, fmt::Debug, fmt::Display, sync::Arc};
|
||||
use std::{borrow::Cow, fmt::Debug, fmt::Display};
|
||||
|
||||
use tokio::{sync::mpsc::error::TrySendError, task::JoinHandle};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::service::Service;
|
||||
|
||||
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct Drain {
|
||||
node_id: NodeId,
|
||||
pub(crate) node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) struct Fill {
|
||||
node_id: NodeId,
|
||||
pub(crate) node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) enum Operation {
|
||||
Drain(Drain),
|
||||
Fill(Fill),
|
||||
@@ -25,184 +23,16 @@ pub(crate) enum Operation {
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum OperationError {
|
||||
#[error("Operation precondition failed: {0}")]
|
||||
PreconditionFailed(Cow<'static, str>),
|
||||
#[error("Node state changed during operation: {0}")]
|
||||
NodeStateChanged(Cow<'static, str>),
|
||||
#[error("Operation cancelled")]
|
||||
Cancelled,
|
||||
#[error("Shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
struct OperationHandler {
|
||||
operation: Operation,
|
||||
pub(crate) struct OperationHandler {
|
||||
pub(crate) operation: Operation,
|
||||
#[allow(unused)]
|
||||
cancel: CancellationToken,
|
||||
#[allow(unused)]
|
||||
handle: JoinHandle<Result<(), OperationError>>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct OngoingOperations(Arc<std::sync::RwLock<HashMap<NodeId, OperationHandler>>>);
|
||||
|
||||
pub(crate) struct Controller {
|
||||
ongoing: OngoingOperations,
|
||||
service: Arc<Service>,
|
||||
sender: tokio::sync::mpsc::Sender<Operation>,
|
||||
}
|
||||
|
||||
impl Controller {
|
||||
pub(crate) fn new(service: Arc<Service>) -> (Self, tokio::sync::mpsc::Receiver<Operation>) {
|
||||
let (operations_tx, operations_rx) = tokio::sync::mpsc::channel(1);
|
||||
(
|
||||
Self {
|
||||
ongoing: Default::default(),
|
||||
service,
|
||||
sender: operations_tx,
|
||||
},
|
||||
operations_rx,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn drain_node(&self, node_id: NodeId) -> Result<(), OperationError> {
|
||||
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
|
||||
return Err(OperationError::PreconditionFailed(
|
||||
format!(
|
||||
"Background operation already ongoing for node: {}",
|
||||
handler.operation
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
|
||||
self.sender.try_send(Operation::Drain(Drain { node_id }))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn fill_node(&self, node_id: NodeId) -> Result<(), OperationError> {
|
||||
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
|
||||
return Err(OperationError::PreconditionFailed(
|
||||
format!(
|
||||
"Background operation already ongoing for node: {}",
|
||||
handler.operation
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
|
||||
self.sender.try_send(Operation::Fill(Fill { node_id }))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_operations(
|
||||
&self,
|
||||
mut receiver: tokio::sync::mpsc::Receiver<Operation>,
|
||||
) {
|
||||
while let Some(op) = receiver.recv().await {
|
||||
match op {
|
||||
Operation::Drain(drain) => self.handle_drain(drain),
|
||||
Operation::Fill(fill) => self.handle_fill(fill),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_drain(&self, drain: Drain) {
|
||||
let node_id = drain.node_id;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let (_holder, waiter) = utils::completion::channel();
|
||||
|
||||
let handle = tokio::task::spawn({
|
||||
let service = self.service.clone();
|
||||
let ongoing = self.ongoing.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let removed = ongoing.0.write().unwrap().remove(&drain.node_id);
|
||||
if let Some(Operation::Drain(removed_drain)) = removed.map(|h| h.operation) {
|
||||
assert_eq!(removed_drain.node_id, drain.node_id, "We always remove the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
waiter.wait().await;
|
||||
service.drain_node(drain.node_id, cancel).await
|
||||
}
|
||||
});
|
||||
|
||||
let replaced = self.ongoing.0.write().unwrap().insert(
|
||||
node_id,
|
||||
OperationHandler {
|
||||
operation: Operation::Drain(drain),
|
||||
cancel,
|
||||
handle,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"The channel size is 1 and we checked before enqueing"
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_fill(&self, fill: Fill) {
|
||||
let node_id = fill.node_id;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let (_holder, waiter) = utils::completion::channel();
|
||||
|
||||
let handle = tokio::task::spawn({
|
||||
let service = self.service.clone();
|
||||
let ongoing = self.ongoing.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let removed = ongoing.0.write().unwrap().remove(&fill.node_id);
|
||||
if let Some(Operation::Fill(removed_fill)) = removed.map(|h| h.operation) {
|
||||
assert_eq!(removed_fill.node_id, fill.node_id, "We always remove the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
waiter.wait().await;
|
||||
service.fill_node(fill.node_id, cancel).await
|
||||
}
|
||||
});
|
||||
|
||||
let replaced = self.ongoing.0.write().unwrap().insert(
|
||||
node_id,
|
||||
OperationHandler {
|
||||
operation: Operation::Fill(fill),
|
||||
cancel,
|
||||
handle,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"The channel size is 1 and we checked before enqueing"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<TrySendError<T>> for OperationError {
|
||||
fn from(value: TrySendError<T>) -> Self {
|
||||
match value {
|
||||
TrySendError::Full(_) => {
|
||||
Self::PreconditionFailed("Too many background operation in progress".into())
|
||||
}
|
||||
TrySendError::Closed(_) => Self::ShuttingDown,
|
||||
}
|
||||
}
|
||||
pub(crate) cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl Display for Drain {
|
||||
@@ -225,9 +55,3 @@ impl Display for Operation {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Controller {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "backround_node_operations::Controller")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,9 @@ use std::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
background_node_operations::{self, Controller, OperationError, MAX_RECONCILES_PER_OPERATION},
|
||||
background_node_operations::{
|
||||
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
|
||||
},
|
||||
compute_hook::NotifyError,
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
@@ -136,6 +138,8 @@ struct ServiceState {
|
||||
|
||||
scheduler: Scheduler,
|
||||
|
||||
ongoing_operation: Option<OperationHandler>,
|
||||
|
||||
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
}
|
||||
@@ -187,6 +191,7 @@ impl ServiceState {
|
||||
tenants,
|
||||
nodes: Arc::new(nodes),
|
||||
scheduler,
|
||||
ongoing_operation: None,
|
||||
delayed_reconcile_rx,
|
||||
}
|
||||
}
|
||||
@@ -277,8 +282,6 @@ pub struct Service {
|
||||
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
|
||||
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
|
||||
|
||||
background_operations_controller: std::sync::OnceLock<Controller>,
|
||||
|
||||
// Process shutdown will fire this token
|
||||
cancel: CancellationToken,
|
||||
|
||||
@@ -303,11 +306,9 @@ impl From<ReconcileWaitError> for ApiError {
|
||||
impl From<OperationError> for ApiError {
|
||||
fn from(value: OperationError) -> Self {
|
||||
match value {
|
||||
OperationError::PreconditionFailed(err) => ApiError::PreconditionFailed(err.into()),
|
||||
OperationError::NodeStateChanged(err) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(err))
|
||||
}
|
||||
OperationError::ShuttingDown => ApiError::ShuttingDown,
|
||||
OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
|
||||
}
|
||||
}
|
||||
@@ -1107,20 +1108,12 @@ impl Service {
|
||||
delayed_reconcile_tx,
|
||||
abort_tx,
|
||||
startup_complete: startup_complete.clone(),
|
||||
background_operations_controller: Default::default(),
|
||||
cancel,
|
||||
gate: Gate::default(),
|
||||
tenant_op_locks: Default::default(),
|
||||
node_op_locks: Default::default(),
|
||||
});
|
||||
|
||||
let (controller, node_operations_receiver) =
|
||||
background_node_operations::Controller::new(this.clone());
|
||||
|
||||
this.background_operations_controller
|
||||
.set(controller)
|
||||
.expect("This is the only code path that sets the controller");
|
||||
|
||||
let result_task_this = this.clone();
|
||||
tokio::task::spawn(async move {
|
||||
// Block shutdown until we're done (we must respect self.cancel)
|
||||
@@ -1192,19 +1185,6 @@ impl Service {
|
||||
}
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let this = this.clone();
|
||||
let startup_complete = startup_complete.clone();
|
||||
async move {
|
||||
startup_complete.wait().await;
|
||||
this.background_operations_controller
|
||||
.get()
|
||||
.expect("Initialized at start up")
|
||||
.handle_operations(node_operations_receiver)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
@@ -4451,8 +4431,11 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy, schedulable_nodes_count) = {
|
||||
pub(crate) async fn start_node_drain(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
@@ -4464,12 +4447,22 @@ impl Service {
|
||||
.count();
|
||||
|
||||
(
|
||||
locked
|
||||
.ongoing_operation
|
||||
.as_ref()
|
||||
.map(|ongoing| ongoing.operation),
|
||||
node.is_available(),
|
||||
node.get_scheduling(),
|
||||
schedulable_nodes_count,
|
||||
)
|
||||
};
|
||||
|
||||
if let Some(ongoing) = ongoing_op {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Background operation already ongoing for node: {}", ongoing).into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
@@ -4486,11 +4479,30 @@ impl Service {
|
||||
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
|
||||
.await?;
|
||||
let controller = self
|
||||
.background_operations_controller
|
||||
.get()
|
||||
.expect("Initialized at start up");
|
||||
controller.drain_node(node_id)?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
|
||||
operation: Operation::Drain(Drain { node_id }),
|
||||
cancel: cancel.clone(),
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let prev = service.inner.write().unwrap().ongoing_operation.take();
|
||||
|
||||
if let Some(Operation::Drain(removed_drain)) = prev.map(|h| h.operation) {
|
||||
assert_eq!(removed_drain.node_id, node_id, "We always take the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
service.drain_node(node_id, cancel).await
|
||||
}
|
||||
});
|
||||
}
|
||||
NodeSchedulingPolicy::Draining => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
@@ -4507,17 +4519,31 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy, total_nodes_count) = {
|
||||
pub(crate) async fn start_node_fill(self: &Arc<Self>, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, total_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(node.is_available(), node.get_scheduling(), nodes.len())
|
||||
(
|
||||
locked
|
||||
.ongoing_operation
|
||||
.as_ref()
|
||||
.map(|ongoing| ongoing.operation),
|
||||
node.is_available(),
|
||||
node.get_scheduling(),
|
||||
nodes.len(),
|
||||
)
|
||||
};
|
||||
|
||||
if let Some(ongoing) = ongoing_op {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Background operation already ongoing for node: {}", ongoing).into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
@@ -4534,11 +4560,31 @@ impl Service {
|
||||
NodeSchedulingPolicy::Active => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
|
||||
.await?;
|
||||
let controller = self
|
||||
.background_operations_controller
|
||||
.get()
|
||||
.expect("Initialized at start up");
|
||||
controller.fill_node(node_id)?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
self.inner.write().unwrap().ongoing_operation = Some(OperationHandler {
|
||||
operation: Operation::Fill(Fill { node_id }),
|
||||
cancel: cancel.clone(),
|
||||
});
|
||||
|
||||
tokio::task::spawn({
|
||||
let service = self.clone();
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let prev = service.inner.write().unwrap().ongoing_operation.take();
|
||||
|
||||
if let Some(Operation::Fill(removed_fill)) = prev.map(|h| h.operation) {
|
||||
assert_eq!(removed_fill.node_id, node_id, "We always take the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
service.fill_node(node_id, cancel).await
|
||||
}
|
||||
});
|
||||
}
|
||||
NodeSchedulingPolicy::Filling => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
|
||||
Reference in New Issue
Block a user