From fce602ed3022c624127f2003d92130cde0ec5b37 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 17 Jun 2024 16:22:44 +0100 Subject: [PATCH] review: remove Controller entity --- .../src/background_node_operations.rs | 190 +----------------- storage_controller/src/service.rs | 128 ++++++++---- 2 files changed, 94 insertions(+), 224 deletions(-) diff --git a/storage_controller/src/background_node_operations.rs b/storage_controller/src/background_node_operations.rs index 619452832e..279f2bb7b2 100644 --- a/storage_controller/src/background_node_operations.rs +++ b/storage_controller/src/background_node_operations.rs @@ -1,23 +1,21 @@ -use std::{borrow::Cow, collections::HashMap, fmt::Debug, fmt::Display, sync::Arc}; +use std::{borrow::Cow, fmt::Debug, fmt::Display}; -use tokio::{sync::mpsc::error::TrySendError, task::JoinHandle}; use tokio_util::sync::CancellationToken; use utils::id::NodeId; -use crate::service::Service; - pub(crate) const MAX_RECONCILES_PER_OPERATION: usize = 10; #[derive(Copy, Clone)] pub(crate) struct Drain { - node_id: NodeId, + pub(crate) node_id: NodeId, } #[derive(Copy, Clone)] pub(crate) struct Fill { - node_id: NodeId, + pub(crate) node_id: NodeId, } +#[derive(Copy, Clone)] pub(crate) enum Operation { Drain(Drain), Fill(Fill), @@ -25,184 +23,16 @@ pub(crate) enum Operation { #[derive(Debug, thiserror::Error)] pub(crate) enum OperationError { - #[error("Operation precondition failed: {0}")] - PreconditionFailed(Cow<'static, str>), #[error("Node state changed during operation: {0}")] NodeStateChanged(Cow<'static, str>), #[error("Operation cancelled")] Cancelled, - #[error("Shutting down")] - ShuttingDown, } -struct OperationHandler { - operation: Operation, +pub(crate) struct OperationHandler { + pub(crate) operation: Operation, #[allow(unused)] - cancel: CancellationToken, - #[allow(unused)] - handle: JoinHandle>, -} - -#[derive(Default, Clone)] -struct OngoingOperations(Arc>>); - -pub(crate) struct Controller { - ongoing: OngoingOperations, - service: Arc, - sender: tokio::sync::mpsc::Sender, -} - -impl Controller { - pub(crate) fn new(service: Arc) -> (Self, tokio::sync::mpsc::Receiver) { - let (operations_tx, operations_rx) = tokio::sync::mpsc::channel(1); - ( - Self { - ongoing: Default::default(), - service, - sender: operations_tx, - }, - operations_rx, - ) - } - - pub(crate) fn drain_node(&self, node_id: NodeId) -> Result<(), OperationError> { - if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) { - return Err(OperationError::PreconditionFailed( - format!( - "Background operation already ongoing for node: {}", - handler.operation - ) - .into(), - )); - } - - self.sender.try_send(Operation::Drain(Drain { node_id }))?; - - Ok(()) - } - - pub(crate) fn fill_node(&self, node_id: NodeId) -> Result<(), OperationError> { - if let Some(handler) = self.ongoing.0.read().unwrap().get(&node_id) { - return Err(OperationError::PreconditionFailed( - format!( - "Background operation already ongoing for node: {}", - handler.operation - ) - .into(), - )); - } - - self.sender.try_send(Operation::Fill(Fill { node_id }))?; - - Ok(()) - } - - pub(crate) async fn handle_operations( - &self, - mut receiver: tokio::sync::mpsc::Receiver, - ) { - while let Some(op) = receiver.recv().await { - match op { - Operation::Drain(drain) => self.handle_drain(drain), - Operation::Fill(fill) => self.handle_fill(fill), - } - } - } - - fn handle_drain(&self, drain: Drain) { - let node_id = drain.node_id; - - let cancel = CancellationToken::new(); - - let (_holder, waiter) = utils::completion::channel(); - - let handle = tokio::task::spawn({ - let service = self.service.clone(); - let ongoing = self.ongoing.clone(); - let cancel = cancel.clone(); - - async move { - scopeguard::defer! { - let removed = ongoing.0.write().unwrap().remove(&drain.node_id); - if let Some(Operation::Drain(removed_drain)) = removed.map(|h| h.operation) { - assert_eq!(removed_drain.node_id, drain.node_id, "We always remove the same operation"); - } else { - panic!("We always remove the same operation") - } - } - - waiter.wait().await; - service.drain_node(drain.node_id, cancel).await - } - }); - - let replaced = self.ongoing.0.write().unwrap().insert( - node_id, - OperationHandler { - operation: Operation::Drain(drain), - cancel, - handle, - }, - ); - - assert!( - replaced.is_none(), - "The channel size is 1 and we checked before enqueing" - ); - } - - fn handle_fill(&self, fill: Fill) { - let node_id = fill.node_id; - - let cancel = CancellationToken::new(); - - let (_holder, waiter) = utils::completion::channel(); - - let handle = tokio::task::spawn({ - let service = self.service.clone(); - let ongoing = self.ongoing.clone(); - let cancel = cancel.clone(); - - async move { - scopeguard::defer! { - let removed = ongoing.0.write().unwrap().remove(&fill.node_id); - if let Some(Operation::Fill(removed_fill)) = removed.map(|h| h.operation) { - assert_eq!(removed_fill.node_id, fill.node_id, "We always remove the same operation"); - } else { - panic!("We always remove the same operation") - } - } - - waiter.wait().await; - service.fill_node(fill.node_id, cancel).await - } - }); - - let replaced = self.ongoing.0.write().unwrap().insert( - node_id, - OperationHandler { - operation: Operation::Fill(fill), - cancel, - handle, - }, - ); - - assert!( - replaced.is_none(), - "The channel size is 1 and we checked before enqueing" - ); - } -} - -impl From> for OperationError { - fn from(value: TrySendError) -> Self { - match value { - TrySendError::Full(_) => { - Self::PreconditionFailed("Too many background operation in progress".into()) - } - TrySendError::Closed(_) => Self::ShuttingDown, - } - } + pub(crate) cancel: CancellationToken, } impl Display for Drain { @@ -225,9 +55,3 @@ impl Display for Operation { } } } - -impl Debug for Controller { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "backround_node_operations::Controller") - } -} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 9c9819d7b0..6d43519ec5 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -8,7 +8,9 @@ use std::{ }; use crate::{ - background_node_operations::{self, Controller, OperationError, MAX_RECONCILES_PER_OPERATION}, + background_node_operations::{ + Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION, + }, compute_hook::NotifyError, id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, WrappedWriteGuard}, persistence::{AbortShardSplitStatus, TenantFilter}, @@ -136,6 +138,8 @@ struct ServiceState { scheduler: Scheduler, + ongoing_operation: Option, + /// Queue of tenants who are waiting for concurrency limits to permit them to reconcile delayed_reconcile_rx: tokio::sync::mpsc::Receiver, } @@ -187,6 +191,7 @@ impl ServiceState { tenants, nodes: Arc::new(nodes), scheduler, + ongoing_operation: None, delayed_reconcile_rx, } } @@ -277,8 +282,6 @@ pub struct Service { /// use a VecDeque instead of a channel to reduce synchronization overhead, at the cost of some code complexity. delayed_reconcile_tx: tokio::sync::mpsc::Sender, - background_operations_controller: std::sync::OnceLock, - // Process shutdown will fire this token cancel: CancellationToken, @@ -303,11 +306,9 @@ impl From for ApiError { impl From for ApiError { fn from(value: OperationError) -> Self { match value { - OperationError::PreconditionFailed(err) => ApiError::PreconditionFailed(err.into()), OperationError::NodeStateChanged(err) => { ApiError::InternalServerError(anyhow::anyhow!(err)) } - OperationError::ShuttingDown => ApiError::ShuttingDown, OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()), } } @@ -1107,20 +1108,12 @@ impl Service { delayed_reconcile_tx, abort_tx, startup_complete: startup_complete.clone(), - background_operations_controller: Default::default(), cancel, gate: Gate::default(), tenant_op_locks: Default::default(), node_op_locks: Default::default(), }); - let (controller, node_operations_receiver) = - background_node_operations::Controller::new(this.clone()); - - this.background_operations_controller - .set(controller) - .expect("This is the only code path that sets the controller"); - let result_task_this = this.clone(); tokio::task::spawn(async move { // Block shutdown until we're done (we must respect self.cancel) @@ -1192,19 +1185,6 @@ impl Service { } }); - tokio::task::spawn({ - let this = this.clone(); - let startup_complete = startup_complete.clone(); - async move { - startup_complete.wait().await; - this.background_operations_controller - .get() - .expect("Initialized at start up") - .handle_operations(node_operations_receiver) - .await; - } - }); - Ok(this) } @@ -4451,8 +4431,11 @@ impl Service { Ok(()) } - pub(crate) async fn start_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> { - let (node_available, node_policy, schedulable_nodes_count) = { + pub(crate) async fn start_node_drain( + self: &Arc, + node_id: NodeId, + ) -> Result<(), ApiError> { + let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = { let locked = self.inner.read().unwrap(); let nodes = &locked.nodes; let node = nodes.get(&node_id).ok_or(ApiError::NotFound( @@ -4464,12 +4447,22 @@ impl Service { .count(); ( + locked + .ongoing_operation + .as_ref() + .map(|ongoing| ongoing.operation), node.is_available(), node.get_scheduling(), schedulable_nodes_count, ) }; + if let Some(ongoing) = ongoing_op { + return Err(ApiError::PreconditionFailed( + format!("Background operation already ongoing for node: {}", ongoing).into(), + )); + } + if !node_available { return Err(ApiError::ResourceUnavailable( format!("Node {node_id} is currently unavailable").into(), @@ -4486,11 +4479,30 @@ impl Service { NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => { self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining)) .await?; - let controller = self - .background_operations_controller - .get() - .expect("Initialized at start up"); - controller.drain_node(node_id)?; + + let cancel = CancellationToken::new(); + + self.inner.write().unwrap().ongoing_operation = Some(OperationHandler { + operation: Operation::Drain(Drain { node_id }), + cancel: cancel.clone(), + }); + + tokio::task::spawn({ + let service = self.clone(); + let cancel = cancel.clone(); + async move { + scopeguard::defer! { + let prev = service.inner.write().unwrap().ongoing_operation.take(); + + if let Some(Operation::Drain(removed_drain)) = prev.map(|h| h.operation) { + assert_eq!(removed_drain.node_id, node_id, "We always take the same operation"); + } else { + panic!("We always remove the same operation") + } + } + service.drain_node(node_id, cancel).await + } + }); } NodeSchedulingPolicy::Draining => { return Err(ApiError::Conflict(format!( @@ -4507,17 +4519,31 @@ impl Service { Ok(()) } - pub(crate) async fn start_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> { - let (node_available, node_policy, total_nodes_count) = { + pub(crate) async fn start_node_fill(self: &Arc, node_id: NodeId) -> Result<(), ApiError> { + let (ongoing_op, node_available, node_policy, total_nodes_count) = { let locked = self.inner.read().unwrap(); let nodes = &locked.nodes; let node = nodes.get(&node_id).ok_or(ApiError::NotFound( anyhow::anyhow!("Node {} not registered", node_id).into(), ))?; - (node.is_available(), node.get_scheduling(), nodes.len()) + ( + locked + .ongoing_operation + .as_ref() + .map(|ongoing| ongoing.operation), + node.is_available(), + node.get_scheduling(), + nodes.len(), + ) }; + if let Some(ongoing) = ongoing_op { + return Err(ApiError::PreconditionFailed( + format!("Background operation already ongoing for node: {}", ongoing).into(), + )); + } + if !node_available { return Err(ApiError::ResourceUnavailable( format!("Node {node_id} is currently unavailable").into(), @@ -4534,11 +4560,31 @@ impl Service { NodeSchedulingPolicy::Active => { self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling)) .await?; - let controller = self - .background_operations_controller - .get() - .expect("Initialized at start up"); - controller.fill_node(node_id)?; + + let cancel = CancellationToken::new(); + + self.inner.write().unwrap().ongoing_operation = Some(OperationHandler { + operation: Operation::Fill(Fill { node_id }), + cancel: cancel.clone(), + }); + + tokio::task::spawn({ + let service = self.clone(); + let cancel = cancel.clone(); + async move { + scopeguard::defer! { + let prev = service.inner.write().unwrap().ongoing_operation.take(); + + if let Some(Operation::Fill(removed_fill)) = prev.map(|h| h.operation) { + assert_eq!(removed_fill.node_id, node_id, "We always take the same operation"); + } else { + panic!("We always remove the same operation") + } + } + + service.fill_node(node_id, cancel).await + } + }); } NodeSchedulingPolicy::Filling => { return Err(ApiError::Conflict(format!(