mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Better storcon API
This commit is contained in:
@@ -211,6 +211,8 @@ enum Command {
|
||||
StartDrain {
|
||||
#[arg(long)]
|
||||
node_id: NodeId,
|
||||
#[arg(long)]
|
||||
drain_all: Option<bool>,
|
||||
},
|
||||
/// Cancel draining the specified pageserver and wait for `timeout`
|
||||
/// for the operation to be canceled. May be retried.
|
||||
@@ -906,7 +908,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
match &storcon_client
|
||||
.dispatch::<(), NodeDescribeResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/node/{node_id}"),
|
||||
format!("control/v1/node/{node_id}?drain_all=true"),
|
||||
None,
|
||||
)
|
||||
.await?
|
||||
@@ -1146,13 +1148,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
failure
|
||||
);
|
||||
}
|
||||
Command::StartDrain { node_id } => {
|
||||
Command::StartDrain { node_id, drain_all } => {
|
||||
let path = if drain_all == Some(true) {
|
||||
format!("control/v1/node/{node_id}/drain?drain_all=true")
|
||||
} else {
|
||||
format!("control/v1/node/{node_id}/drain")
|
||||
};
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(
|
||||
Method::PUT,
|
||||
format!("control/v1/node/{node_id}/drain"),
|
||||
None,
|
||||
)
|
||||
.dispatch::<(), ()>(Method::PUT, path, None)
|
||||
.await?;
|
||||
println!("Drain started for {node_id}");
|
||||
}
|
||||
|
||||
@@ -47,9 +47,7 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::persistence::SafekeeperUpsert;
|
||||
use crate::reconciler::ReconcileError;
|
||||
use crate::service::{
|
||||
LeadershipStatus, NodeDrainMode, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service,
|
||||
};
|
||||
use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service};
|
||||
|
||||
/// State available to HTTP request handlers
|
||||
pub struct HttpState {
|
||||
@@ -1002,18 +1000,9 @@ async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
let graceful: bool = parse_query_param(&req, "graceful")?.unwrap_or(false);
|
||||
let drain_all: bool = parse_query_param(&req, "drain_all")?.unwrap_or(false);
|
||||
|
||||
let node_drain_mode = if graceful {
|
||||
NodeDrainMode::FullWithReprovisioning
|
||||
} else {
|
||||
NodeDrainMode::AttachedOnly
|
||||
};
|
||||
|
||||
state
|
||||
.service
|
||||
.start_node_drain(node_id, node_drain_mode)
|
||||
.await?;
|
||||
state.service.start_node_drain(node_id, drain_all).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
@@ -687,16 +687,6 @@ struct ShardMutationLocations {
|
||||
#[derive(Default, Clone)]
|
||||
struct TenantMutationLocations(BTreeMap<TenantShardId, ShardMutationLocations>);
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum NodeDrainMode {
|
||||
/// Drain only attached locations (primary node itself)
|
||||
AttachedOnly,
|
||||
/// Drain attached and secondary locations (but don't create new secondaries)
|
||||
AttachedAndSecondaries,
|
||||
/// Drain everything and also provision new secondaries for the newly promoted primaries
|
||||
FullWithReprovisioning,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -7558,7 +7548,7 @@ impl Service {
|
||||
pub(crate) async fn start_node_drain(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
_mode: NodeDrainMode,
|
||||
drain_all: bool,
|
||||
) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -7632,7 +7622,7 @@ impl Service {
|
||||
}
|
||||
|
||||
tracing::info!("Drain background operation starting");
|
||||
let res = service.drain_node(node_id, cancel).await;
|
||||
let res = service.drain_node(node_id, drain_all, cancel).await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!("Drain background operation completed successfully");
|
||||
@@ -8798,9 +8788,30 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain a node by moving the shards attached to it as primaries.
|
||||
/// This is a long running operation and it should run as a separate Tokio task.
|
||||
/// Drain a node by moving shards that are attached to it, either as primaries or secondaries.
|
||||
/// When `drain_all` is false, only primary attachments are moved - this is used during node
|
||||
/// deployment when the node is expected to return to service soon. When `drain_all` is true,
|
||||
/// both primary and secondary attachments are moved - this is used when permanently removing
|
||||
/// a node.
|
||||
///
|
||||
/// This is a long running operation that should be spawned as a separate Tokio task.
|
||||
pub(crate) async fn drain_node(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
drain_all: bool,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
self.drain_primary_attachments(node_id, cancel.clone())
|
||||
.await?;
|
||||
if drain_all {
|
||||
self.drain_secondary_attachments(node_id, cancel).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drain a node by moving the shards attached to it as primaries.
|
||||
/// This is a long running operation
|
||||
async fn drain_primary_attachments(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
@@ -8996,6 +9007,14 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drain_secondary_attachments(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
/// Create a node fill plan (pick secondaries to promote), based on:
|
||||
/// 1. Shards which have a secondary on this node, and this node is in their home AZ, and are currently attached to a node
|
||||
/// outside their home AZ, should be migrated back here.
|
||||
|
||||
Reference in New Issue
Block a user