diff --git a/Cargo.lock b/Cargo.lock
index dbbf330cf9..d1f3f1522e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5801,6 +5801,7 @@ dependencies = [
"r2d2",
"reqwest 0.12.4",
"routerify",
+ "scopeguard",
"serde",
"serde_json",
"strum",
diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml
index 194619a496..b54dea5d47 100644
--- a/storage_controller/Cargo.toml
+++ b/storage_controller/Cargo.toml
@@ -40,6 +40,7 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
measured.workspace = true
+scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true
diff --git a/storage_controller/src/background_node_operations.rs b/storage_controller/src/background_node_operations.rs
index cbcd1e24b6..619452832e 100644
--- a/storage_controller/src/background_node_operations.rs
+++ b/storage_controller/src/background_node_operations.rs
@@ -109,12 +109,88 @@ impl Controller {
}
}
- fn handle_drain(&self, _drain: Drain) {
- todo!("A later commit implements this stub");
+ fn handle_drain(&self, drain: Drain) {
+ let node_id = drain.node_id;
+
+ let cancel = CancellationToken::new();
+
+ let (_holder, waiter) = utils::completion::channel();
+
+ let handle = tokio::task::spawn({
+ let service = self.service.clone();
+ let ongoing = self.ongoing.clone();
+ let cancel = cancel.clone();
+
+ async move {
+ scopeguard::defer! {
+ let removed = ongoing.0.write().unwrap().remove(&drain.node_id);
+ if let Some(Operation::Drain(removed_drain)) = removed.map(|h| h.operation) {
+ assert_eq!(removed_drain.node_id, drain.node_id, "We always remove the same operation");
+ } else {
+ panic!("We always remove the same operation")
+ }
+ }
+
+ waiter.wait().await;
+ service.drain_node(drain.node_id, cancel).await
+ }
+ });
+
+ let replaced = self.ongoing.0.write().unwrap().insert(
+ node_id,
+ OperationHandler {
+ operation: Operation::Drain(drain),
+ cancel,
+ handle,
+ },
+ );
+
+ assert!(
+ replaced.is_none(),
+ "The channel size is 1 and we checked before enqueing"
+ );
}
- fn handle_fill(&self, _fill: Fill) {
- todo!("A later commit implements this stub")
+ fn handle_fill(&self, fill: Fill) {
+ let node_id = fill.node_id;
+
+ let cancel = CancellationToken::new();
+
+ let (_holder, waiter) = utils::completion::channel();
+
+ let handle = tokio::task::spawn({
+ let service = self.service.clone();
+ let ongoing = self.ongoing.clone();
+ let cancel = cancel.clone();
+
+ async move {
+ scopeguard::defer! {
+ let removed = ongoing.0.write().unwrap().remove(&fill.node_id);
+ if let Some(Operation::Fill(removed_fill)) = removed.map(|h| h.operation) {
+ assert_eq!(removed_fill.node_id, fill.node_id, "We always remove the same operation");
+ } else {
+ panic!("We always remove the same operation")
+ }
+ }
+
+ waiter.wait().await;
+ service.fill_node(fill.node_id, cancel).await
+ }
+ });
+
+ let replaced = self.ongoing.0.write().unwrap().insert(
+ node_id,
+ OperationHandler {
+ operation: Operation::Fill(fill),
+ cancel,
+ handle,
+ },
+ );
+
+ assert!(
+ replaced.is_none(),
+ "The channel size is 1 and we checked before enqueing"
+ );
}
}
diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs
index bbb6d2cb32..c627978c93 100644
--- a/storage_controller/src/http.rs
+++ b/storage_controller/src/http.rs
@@ -480,6 +480,28 @@ async fn handle_node_configure(mut req: Request
) -> Result,
)
}
+async fn handle_node_drain(req: Request) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let state = get_state(&req);
+ let node_id: NodeId = parse_request_param(&req, "node_id")?;
+
+ state.service.start_node_drain(node_id).await?;
+
+ json_response(StatusCode::ACCEPTED, ())
+}
+
+async fn handle_node_fill(req: Request) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let state = get_state(&req);
+ let node_id: NodeId = parse_request_param(&req, "node_id")?;
+
+ state.service.start_node_fill(node_id).await?;
+
+ json_response(StatusCode::ACCEPTED, ())
+}
+
async fn handle_tenant_shard_split(
service: Arc,
mut req: Request,
@@ -832,6 +854,13 @@ pub fn make_router(
RequestName("control_v1_node_config"),
)
})
+ .put("/control/v1/node/:node_id/drain", |r| {
+ named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
+ })
+ .put("/control/v1/node/:node_id/fill", |r| {
+ named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
+ })
+ // TODO(vlad): endpoint for cancelling drain and fill
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
tenant_service_handler(
diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs
index f65b17ace0..9b08e4ce7b 100644
--- a/storage_controller/src/service.rs
+++ b/storage_controller/src/service.rs
@@ -300,6 +300,19 @@ impl From for ApiError {
}
}
+impl From for ApiError {
+ fn from(value: OperationError) -> Self {
+ match value {
+ OperationError::PreconditionFailed(err) => ApiError::PreconditionFailed(err.into()),
+ OperationError::NodeStateChanged(err) => {
+ ApiError::InternalServerError(anyhow::anyhow!(err))
+ }
+ OperationError::ShuttingDown => ApiError::ShuttingDown,
+ OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
+ }
+ }
+}
+
#[allow(clippy::large_enum_variant)]
enum TenantCreateOrUpdate {
Create(TenantCreateRequest),
@@ -4412,6 +4425,110 @@ impl Service {
Ok(())
}
+ pub(crate) async fn start_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
+ let (node_available, node_policy, schedulable_nodes_count) = {
+ let locked = self.inner.read().unwrap();
+ let nodes = &locked.nodes;
+ let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
+ anyhow::anyhow!("Node {} not registered", node_id).into(),
+ ))?;
+ let schedulable_nodes_count = nodes
+ .iter()
+ .filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_)))
+ .count();
+
+ (
+ node.is_available(),
+ node.get_scheduling(),
+ schedulable_nodes_count,
+ )
+ };
+
+ if !node_available {
+ return Err(ApiError::ResourceUnavailable(
+ format!("Node {node_id} is currently unavailable").into(),
+ ));
+ }
+
+ if schedulable_nodes_count == 0 {
+ return Err(ApiError::PreconditionFailed(
+ "No other schedulable nodes to drain to".into(),
+ ));
+ }
+
+ match node_policy {
+ NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
+ self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
+ .await?;
+ let controller = self
+ .background_operations_controller
+ .get()
+ .expect("Initialized at start up");
+ controller.drain_node(node_id)?;
+ }
+ NodeSchedulingPolicy::Draining => {
+ return Err(ApiError::Conflict(format!(
+ "Node {node_id} has drain in progress"
+ )));
+ }
+ policy => {
+ return Err(ApiError::PreconditionFailed(
+ format!("Node {node_id} cannot be drained due to {policy:?} policy").into(),
+ ));
+ }
+ }
+
+ Ok(())
+ }
+
+ pub(crate) async fn start_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
+ let (node_available, node_policy, total_nodes_count) = {
+ let locked = self.inner.read().unwrap();
+ let nodes = &locked.nodes;
+ let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
+ anyhow::anyhow!("Node {} not registered", node_id).into(),
+ ))?;
+
+ (node.is_available(), node.get_scheduling(), nodes.len())
+ };
+
+ if !node_available {
+ return Err(ApiError::ResourceUnavailable(
+ format!("Node {node_id} is currently unavailable").into(),
+ ));
+ }
+
+ if total_nodes_count <= 1 {
+ return Err(ApiError::PreconditionFailed(
+ "No other nodes to fill from".into(),
+ ));
+ }
+
+ match node_policy {
+ NodeSchedulingPolicy::Active => {
+ self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
+ .await?;
+ let controller = self
+ .background_operations_controller
+ .get()
+ .expect("Initialized at start up");
+ controller.fill_node(node_id)?;
+ }
+ NodeSchedulingPolicy::Filling => {
+ return Err(ApiError::Conflict(format!(
+ "Node {node_id} has fill in progress"
+ )));
+ }
+ policy => {
+ return Err(ApiError::PreconditionFailed(
+ format!("Node {node_id} cannot be filled due to {policy:?} policy").into(),
+ ));
+ }
+ }
+
+ Ok(())
+ }
+
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.