storcon: add background node operations controller skeleton

This commit is contained in:
Vlad Lazar
2024-06-11 14:03:45 +01:00
parent 126bcc3794
commit bb9c792813
3 changed files with 183 additions and 1 deletions

View File

@@ -0,0 +1,157 @@
use std::{borrow::Cow, collections::HashMap, fmt::Debug, fmt::Display, sync::Arc};
use tokio::{sync::mpsc::error::TrySendError, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use utils::id::NodeId;
use crate::service::Service;
pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10;
#[derive(Copy, Clone)]
pub(crate) struct Drain {
node_id: NodeId,
}
#[derive(Copy, Clone)]
pub(crate) struct Fill {
node_id: NodeId,
}
pub(crate) enum Operation {
Drain(Drain),
Fill(Fill),
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum OperationError {
#[error("Operation precondition failed: {0}")]
PreconditionFailed(Cow<'static, str>),
#[error("Node state changed during operation: {0}")]
NodeStateChanged(Cow<'static, str>),
#[error("Operation cancelled")]
Cancelled,
#[error("Shutting down")]
ShuttingDown,
}
struct OperationHandler {
operation: Operation,
#[allow(unused)]
cancel: CancellationToken,
#[allow(unused)]
handle: JoinHandle<Result<(), OperationError>>,
}
#[derive(Default, Clone)]
struct OngoingOperations(Arc<std::sync::RwLock<HashMap<NodeId, OperationHandler>>>);
pub(crate) struct Controller {
ongoing: OngoingOperations,
service: Arc<Service>,
sender: tokio::sync::mpsc::Sender<Operation>,
}
impl Controller {
pub(crate) fn new(service: Arc<Service>) -> (Self, tokio::sync::mpsc::Receiver<Operation>) {
let (operations_tx, operations_rx) = tokio::sync::mpsc::channel(1);
(
Self {
ongoing: Default::default(),
service,
sender: operations_tx,
},
operations_rx,
)
}
pub(crate) fn drain_node(&self, node_id: NodeId) -> Result<(), OperationError> {
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
return Err(OperationError::PreconditionFailed(
format!(
"Background operation already ongoing for node: {}",
handler.operation
)
.into(),
));
}
self.sender.try_send(Operation::Drain(Drain { node_id }))?;
Ok(())
}
pub(crate) fn fill_node(&self, node_id: NodeId) -> Result<(), OperationError> {
if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) {
return Err(OperationError::PreconditionFailed(
format!(
"Background operation already ongoing for node: {}",
handler.operation
)
.into(),
));
}
self.sender.try_send(Operation::Fill(Fill { node_id }))?;
Ok(())
}
pub(crate) async fn handle_operations(
&self,
mut receiver: tokio::sync::mpsc::Receiver<Operation>,
) {
while let Some(op) = receiver.recv().await {
match op {
Operation::Drain(drain) => self.handle_drain(drain),
Operation::Fill(fill) => self.handle_fill(fill),
}
}
}
fn handle_drain(&self, _drain: Drain) {
todo!("A later commit implements this stub");
}
fn handle_fill(&self, _fill: Fill) {
todo!("A later commit implements this stub")
}
}
impl<T> From<TrySendError<T>> for OperationError {
fn from(value: TrySendError<T>) -> Self {
match value {
TrySendError::Full(_) => {
Self::PreconditionFailed("Too many background operation in progress".into())
}
TrySendError::Closed(_) => Self::ShuttingDown,
}
}
}
impl Display for Drain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "drain {}", self.node_id)
}
}
impl Display for Fill {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "fill {}", self.node_id)
}
}
impl Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Operation::Drain(op) => write!(f, "{op}"),
Operation::Fill(op) => write!(f, "{op}"),
}
}
}
impl Debug for Controller {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "backround_node_operations::Controller")
}
}

View File

@@ -2,6 +2,7 @@ use serde::Serialize;
use utils::seqwait::MonotonicCounter;
mod auth;
mod background_node_operations;
mod compute_hook;
mod heartbeater;
pub mod http;

View File

@@ -8,11 +8,12 @@ use std::{
};
use crate::{
background_node_operations::{self, Controller},
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard},
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
},
@@ -275,6 +276,8 @@ pub struct Service {
/// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity.
delayed_reconcile_tx: tokio::sync::mpsc::Sender<TenantShardId>,
background_operations_controller: std::sync::OnceLock<Controller>,
// Process shutdown will fire this token
cancel: CancellationToken,
@@ -1090,12 +1093,20 @@ impl Service {
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
background_operations_controller: Default::default(),
cancel,
gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
});
let (controller, node_operations_receiver) =
background_node_operations::Controller::new(this.clone());
this.background_operations_controller
.set(controller)
.expect("This is the only code path that sets the controller");
let result_task_this = this.clone();
tokio::task::spawn(async move {
// Block shutdown until we're done (we must respect self.cancel)
@@ -1167,6 +1178,19 @@ impl Service {
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();
async move {
startup_complete.wait().await;
this.background_operations_controller
.get()
.expect("Initialized at start up")
.handle_operations(node_operations_receiver)
.await;
}
});
Ok(this)
}