mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
storcon: plug drain and fill operations to the controller
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5801,6 +5801,7 @@ dependencies = [
|
||||
"r2d2",
|
||||
"reqwest 0.12.4",
|
||||
"routerify",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"strum",
|
||||
|
||||
@@ -40,6 +40,7 @@ tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
measured.workspace = true
|
||||
scopeguard.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
|
||||
|
||||
@@ -109,12 +109,88 @@ impl Controller {
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_drain(&self, _drain: Drain) {
|
||||
todo!("A later commit implements this stub");
|
||||
fn handle_drain(&self, drain: Drain) {
|
||||
let node_id = drain.node_id;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let (_holder, waiter) = utils::completion::channel();
|
||||
|
||||
let handle = tokio::task::spawn({
|
||||
let service = self.service.clone();
|
||||
let ongoing = self.ongoing.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let removed = ongoing.0.write().unwrap().remove(&drain.node_id);
|
||||
if let Some(Operation::Drain(removed_drain)) = removed.map(|h| h.operation) {
|
||||
assert_eq!(removed_drain.node_id, drain.node_id, "We always remove the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
waiter.wait().await;
|
||||
service.drain_node(drain.node_id, cancel).await
|
||||
}
|
||||
});
|
||||
|
||||
let replaced = self.ongoing.0.write().unwrap().insert(
|
||||
node_id,
|
||||
OperationHandler {
|
||||
operation: Operation::Drain(drain),
|
||||
cancel,
|
||||
handle,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"The channel size is 1 and we checked before enqueing"
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_fill(&self, _fill: Fill) {
|
||||
todo!("A later commit implements this stub")
|
||||
fn handle_fill(&self, fill: Fill) {
|
||||
let node_id = fill.node_id;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let (_holder, waiter) = utils::completion::channel();
|
||||
|
||||
let handle = tokio::task::spawn({
|
||||
let service = self.service.clone();
|
||||
let ongoing = self.ongoing.clone();
|
||||
let cancel = cancel.clone();
|
||||
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
let removed = ongoing.0.write().unwrap().remove(&fill.node_id);
|
||||
if let Some(Operation::Fill(removed_fill)) = removed.map(|h| h.operation) {
|
||||
assert_eq!(removed_fill.node_id, fill.node_id, "We always remove the same operation");
|
||||
} else {
|
||||
panic!("We always remove the same operation")
|
||||
}
|
||||
}
|
||||
|
||||
waiter.wait().await;
|
||||
service.fill_node(fill.node_id, cancel).await
|
||||
}
|
||||
});
|
||||
|
||||
let replaced = self.ongoing.0.write().unwrap().insert(
|
||||
node_id,
|
||||
OperationHandler {
|
||||
operation: Operation::Fill(fill),
|
||||
cancel,
|
||||
handle,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(
|
||||
replaced.is_none(),
|
||||
"The channel size is 1 and we checked before enqueing"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -480,6 +480,28 @@ async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.start_node_drain(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_node_fill(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
state.service.start_node_fill(node_id).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_split(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -832,6 +854,13 @@ pub fn make_router(
|
||||
RequestName("control_v1_node_config"),
|
||||
)
|
||||
})
|
||||
.put("/control/v1/node/:node_id/drain", |r| {
|
||||
named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain"))
|
||||
})
|
||||
.put("/control/v1/node/:node_id/fill", |r| {
|
||||
named_request_span(r, handle_node_fill, RequestName("control_v1_node_fill"))
|
||||
})
|
||||
// TODO(vlad): endpoint for cancelling drain and fill
|
||||
// Tenant Shard operations
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
tenant_service_handler(
|
||||
|
||||
@@ -300,6 +300,19 @@ impl From<ReconcileWaitError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OperationError> for ApiError {
|
||||
fn from(value: OperationError) -> Self {
|
||||
match value {
|
||||
OperationError::PreconditionFailed(err) => ApiError::PreconditionFailed(err.into()),
|
||||
OperationError::NodeStateChanged(err) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(err))
|
||||
}
|
||||
OperationError::ShuttingDown => ApiError::ShuttingDown,
|
||||
OperationError::Cancelled => ApiError::Conflict("Operation was cancelled".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum TenantCreateOrUpdate {
|
||||
Create(TenantCreateRequest),
|
||||
@@ -4412,6 +4425,110 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy, schedulable_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
let schedulable_nodes_count = nodes
|
||||
.iter()
|
||||
.filter(|(_, n)| matches!(n.may_schedule(), MaySchedule::Yes(_)))
|
||||
.count();
|
||||
|
||||
(
|
||||
node.is_available(),
|
||||
node.get_scheduling(),
|
||||
schedulable_nodes_count,
|
||||
)
|
||||
};
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if schedulable_nodes_count == 0 {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
"No other schedulable nodes to drain to".into(),
|
||||
));
|
||||
}
|
||||
|
||||
match node_policy {
|
||||
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Pause => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Draining))
|
||||
.await?;
|
||||
let controller = self
|
||||
.background_operations_controller
|
||||
.get()
|
||||
.expect("Initialized at start up");
|
||||
controller.drain_node(node_id)?;
|
||||
}
|
||||
NodeSchedulingPolicy::Draining => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node_id} has drain in progress"
|
||||
)));
|
||||
}
|
||||
policy => {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} cannot be drained due to {policy:?} policy").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn start_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> {
|
||||
let (node_available, node_policy, total_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let nodes = &locked.nodes;
|
||||
let node = nodes.get(&node_id).ok_or(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
))?;
|
||||
|
||||
(node.is_available(), node.get_scheduling(), nodes.len())
|
||||
};
|
||||
|
||||
if !node_available {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Node {node_id} is currently unavailable").into(),
|
||||
));
|
||||
}
|
||||
|
||||
if total_nodes_count <= 1 {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
"No other nodes to fill from".into(),
|
||||
));
|
||||
}
|
||||
|
||||
match node_policy {
|
||||
NodeSchedulingPolicy::Active => {
|
||||
self.node_configure(node_id, None, Some(NodeSchedulingPolicy::Filling))
|
||||
.await?;
|
||||
let controller = self
|
||||
.background_operations_controller
|
||||
.get()
|
||||
.expect("Initialized at start up");
|
||||
controller.fill_node(node_id)?;
|
||||
}
|
||||
NodeSchedulingPolicy::Filling => {
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node_id} has fill in progress"
|
||||
)));
|
||||
}
|
||||
policy => {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Node {node_id} cannot be filled due to {policy:?} policy").into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
|
||||
Reference in New Issue
Block a user