From bb9c792813b107848b453533f11635b0129e17d4 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 11 Jun 2024 14:03:45 +0100 Subject: [PATCH] storcon: add background node operations controller skeleton --- .../src/background_node_operations.rs | 157 ++++++++++++++++++ storage_controller/src/lib.rs | 1 + storage_controller/src/service.rs | 26 ++- 3 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 storage_controller/src/background_node_operations.rs diff --git a/storage_controller/src/background_node_operations.rs b/storage_controller/src/background_node_operations.rs new file mode 100644 index 0000000000..cbcd1e24b6 --- /dev/null +++ b/storage_controller/src/background_node_operations.rs @@ -0,0 +1,157 @@ +use std::{borrow::Cow, collections::HashMap, fmt::Debug, fmt::Display, sync::Arc}; + +use tokio::{sync::mpsc::error::TrySendError, task::JoinHandle}; +use tokio_util::sync::CancellationToken; +use utils::id::NodeId; + +use crate::service::Service; + +pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10; + +#[derive(Copy, Clone)] +pub(crate) struct Drain { + node_id: NodeId, +} + +#[derive(Copy, Clone)] +pub(crate) struct Fill { + node_id: NodeId, +} + +pub(crate) enum Operation { + Drain(Drain), + Fill(Fill), +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum OperationError { + #[error("Operation precondition failed: {0}")] + PreconditionFailed(Cow<'static, str>), + #[error("Node state changed during operation: {0}")] + NodeStateChanged(Cow<'static, str>), + #[error("Operation cancelled")] + Cancelled, + #[error("Shutting down")] + ShuttingDown, +} + +struct OperationHandler { + operation: Operation, + #[allow(unused)] + cancel: CancellationToken, + #[allow(unused)] + handle: JoinHandle>, +} + +#[derive(Default, Clone)] +struct OngoingOperations(Arc>>); + +pub(crate) struct Controller { + ongoing: OngoingOperations, + service: Arc, + sender: tokio::sync::mpsc::Sender, +} + +impl Controller { + pub(crate) fn new(service: Arc) -> (Self, tokio::sync::mpsc::Receiver) { + let (operations_tx, operations_rx) = tokio::sync::mpsc::channel(1); + ( + Self { + ongoing: Default::default(), + service, + sender: operations_tx, + }, + operations_rx, + ) + } + + pub(crate) fn drain_node(&self, node_id: NodeId) -> Result<(), OperationError> { + if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) { + return Err(OperationError::PreconditionFailed( + format!( + "Background operation already ongoing for node: {}", + handler.operation + ) + .into(), + )); + } + + self.sender.try_send(Operation::Drain(Drain { node_id }))?; + + Ok(()) + } + + pub(crate) fn fill_node(&self, node_id: NodeId) -> Result<(), OperationError> { + if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) { + return Err(OperationError::PreconditionFailed( + format!( + "Background operation already ongoing for node: {}", + handler.operation + ) + .into(), + )); + } + + self.sender.try_send(Operation::Fill(Fill { node_id }))?; + + Ok(()) + } + + pub(crate) async fn handle_operations( + &self, + mut receiver: tokio::sync::mpsc::Receiver, + ) { + while let Some(op) = receiver.recv().await { + match op { + Operation::Drain(drain) => self.handle_drain(drain), + Operation::Fill(fill) => self.handle_fill(fill), + } + } + } + + fn handle_drain(&self, _drain: Drain) { + todo!("A later commit implements this stub"); + } + + fn handle_fill(&self, _fill: Fill) { + todo!("A later commit implements this stub") + } +} + +impl From> for OperationError { + fn from(value: TrySendError) -> Self { + match value { + TrySendError::Full(_) => { + Self::PreconditionFailed("Too many background operation in progress".into()) + } + TrySendError::Closed(_) => Self::ShuttingDown, + } + } +} + +impl Display for Drain { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "drain {}", self.node_id) + } +} + +impl Display for Fill { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "fill {}", self.node_id) + } +} + +impl Display for Operation { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Operation::Drain(op) => write!(f, "{op}"), + Operation::Fill(op) => write!(f, "{op}"), + } + } +} + +impl Debug for Controller { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "backround_node_operations::Controller") + } +} diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index 2ea490a14b..8caf638904 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -2,6 +2,7 @@ use serde::Serialize; use utils::seqwait::MonotonicCounter; mod auth; +mod background_node_operations; mod compute_hook; mod heartbeater; pub mod http; diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 1e81b5c5a2..db3e700309 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -8,11 +8,12 @@ use std::{ }; use crate::{ + background_node_operations::{self, Controller}, compute_hook::NotifyError, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard}, persistence::{AbortShardSplitStatus, TenantFilter}, reconciler::{ReconcileError, ReconcileUnits}, - scheduler::{ScheduleContext, ScheduleMode}, + scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, tenant_shard::{ MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction, }, @@ -275,6 +276,8 @@ pub struct Service { /// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity. delayed_reconcile_tx: tokio::sync::mpsc::Sender, + background_operations_controller: std::sync::OnceLock, + // Process shutdown will fire this token cancel: CancellationToken, @@ -1090,12 +1093,20 @@ impl Service { delayed_reconcile_tx, abort_tx, startup_complete: startup_complete.clone(), + background_operations_controller: Default::default(), cancel, gate: Gate::default(), tenant_op_locks: Default::default(), node_op_locks: Default::default(), }); + let (controller, node_operations_receiver) = + background_node_operations::Controller::new(this.clone()); + + this.background_operations_controller + .set(controller) + .expect("This is the only code path that sets the controller"); + let result_task_this = this.clone(); tokio::task::spawn(async move { // Block shutdown until we're done (we must respect self.cancel) @@ -1167,6 +1178,19 @@ impl Service { } }); + tokio::task::spawn({ + let this = this.clone(); + let startup_complete = startup_complete.clone(); + async move { + startup_complete.wait().await; + this.background_operations_controller + .get() + .expect("Initialized at start up") + .handle_operations(node_operations_receiver) + .await; + } + }); + Ok(this) }