mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-05 06:20:37 +00:00
controller: add storcon-cli (#7114)
## Problem During incidents, we may need to quickly access the storage controller's API without trying API client code or crafting `curl` CLIs on the fly. A basic CLI client is needed for this. ## Summary of changes - Update storage controller node listing API to only use public types in controller_api.rs - Add a storage controller API for listing tenants - Add a basic test that the CLI can list and modify nodes and tenants.
This commit is contained in:
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -288,6 +288,7 @@ dependencies = [
|
||||
"hex",
|
||||
"humantime",
|
||||
"hyper",
|
||||
"itertools",
|
||||
"lasso",
|
||||
"measured",
|
||||
"metrics",
|
||||
@@ -5622,6 +5623,26 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "storcon_cli"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"comfy-table",
|
||||
"hyper",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stringprep"
|
||||
version = "0.1.2"
|
||||
|
||||
@@ -4,6 +4,7 @@ members = [
|
||||
"compute_tools",
|
||||
"control_plane",
|
||||
"control_plane/attachment_service",
|
||||
"control_plane/storcon_cli",
|
||||
"pageserver",
|
||||
"pageserver/compaction",
|
||||
"pageserver/ctl",
|
||||
|
||||
@@ -25,6 +25,7 @@ git-version.workspace = true
|
||||
hex.workspace = true
|
||||
hyper.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -399,6 +399,15 @@ async fn handle_tenant_describe(
|
||||
json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
|
||||
}
|
||||
|
||||
async fn handle_tenant_list(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
json_response(StatusCode::OK, service.tenant_list())
|
||||
}
|
||||
|
||||
async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -412,7 +421,10 @@ async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let state = get_state(&req);
|
||||
json_response(StatusCode::OK, state.service.node_list().await?)
|
||||
let nodes = state.service.node_list().await?;
|
||||
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
|
||||
|
||||
json_response(StatusCode::OK, api_nodes)
|
||||
}
|
||||
|
||||
async fn handle_node_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -793,6 +805,9 @@ pub fn make_router(
|
||||
RequestName("control_v1_tenant_describe"),
|
||||
)
|
||||
})
|
||||
.get("/control/v1/tenant", |r| {
|
||||
tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list"))
|
||||
})
|
||||
.put("/control/v1/tenant/:tenant_id/policy", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
|
||||
@@ -3,7 +3,8 @@ use std::{str::FromStr, time::Duration};
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, TenantLocateResponseShard,
|
||||
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
TenantLocateResponseShard,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
@@ -256,6 +257,19 @@ impl Node {
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Generate the simplified API-friendly description of a node's state
|
||||
pub(crate) fn describe(&self) -> NodeDescribeResponse {
|
||||
NodeDescribeResponse {
|
||||
id: self.id,
|
||||
availability: self.availability.into(),
|
||||
scheduling: self.scheduling,
|
||||
listen_http_addr: self.listen_http_addr.clone(),
|
||||
listen_http_port: self.listen_http_port,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Node {
|
||||
|
||||
@@ -20,6 +20,7 @@ use control_plane::storage_controller::{
|
||||
use diesel::result::DatabaseErrorKind;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use hyper::StatusCode;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
@@ -2735,47 +2736,73 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_describe(
|
||||
/// Returns None if the input iterator of shards does not include a shard with number=0
|
||||
fn tenant_describe_impl<'a>(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantDescribeResponse, ApiError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
shards: impl Iterator<Item = &'a TenantState>,
|
||||
) -> Option<TenantDescribeResponse> {
|
||||
let mut shard_zero = None;
|
||||
let mut shards = Vec::new();
|
||||
let mut describe_shards = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
if tenant_shard_id.is_zero() {
|
||||
for shard in shards {
|
||||
if shard.tenant_shard_id.is_zero() {
|
||||
shard_zero = Some(shard);
|
||||
}
|
||||
|
||||
let response_shard = TenantDescribeResponseShard {
|
||||
tenant_shard_id: *tenant_shard_id,
|
||||
describe_shards.push(TenantDescribeResponseShard {
|
||||
tenant_shard_id: shard.tenant_shard_id,
|
||||
node_attached: *shard.intent.get_attached(),
|
||||
node_secondary: shard.intent.get_secondary().to_vec(),
|
||||
last_error: shard.last_error.lock().unwrap().clone(),
|
||||
is_reconciling: shard.reconciler.is_some(),
|
||||
is_pending_compute_notification: shard.pending_compute_notification,
|
||||
is_splitting: matches!(shard.splitting, SplitState::Splitting),
|
||||
};
|
||||
shards.push(response_shard);
|
||||
scheduling_policy: *shard.get_scheduling_policy(),
|
||||
})
|
||||
}
|
||||
|
||||
let Some(shard_zero) = shard_zero else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
|
||||
));
|
||||
};
|
||||
let shard_zero = shard_zero?;
|
||||
|
||||
Ok(TenantDescribeResponse {
|
||||
shards,
|
||||
Some(TenantDescribeResponse {
|
||||
tenant_id: shard_zero.tenant_shard_id.tenant_id,
|
||||
shards: describe_shards,
|
||||
stripe_size: shard_zero.shard.stripe_size,
|
||||
policy: shard_zero.policy.clone(),
|
||||
config: shard_zero.config.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_describe(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantDescribeResponse, ApiError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
self.tenant_describe_impl(
|
||||
locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.map(|(_k, v)| v),
|
||||
)
|
||||
.ok_or_else(|| ApiError::NotFound(anyhow::anyhow!("Tenant {tenant_id} not found").into()))
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_list(&self) -> Vec<TenantDescribeResponse> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
let mut result = Vec::new();
|
||||
for (_tenant_id, tenant_shards) in
|
||||
&locked.tenants.iter().group_by(|(id, _shard)| id.tenant_id)
|
||||
{
|
||||
result.push(
|
||||
self.tenant_describe_impl(tenant_shards.map(|(_k, v)| v))
|
||||
.expect("Groups are always non-empty"),
|
||||
);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%op.tenant_id))]
|
||||
async fn abort_tenant_shard_split(
|
||||
&self,
|
||||
|
||||
@@ -14,9 +14,7 @@ use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR};
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
use control_plane::storage_controller::StorageController;
|
||||
use control_plane::{broker, local_env};
|
||||
use pageserver_api::controller_api::{
|
||||
NodeAvailability, NodeConfigureRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
};
|
||||
use pageserver_api::controller_api::PlacementPolicy;
|
||||
use pageserver_api::models::{
|
||||
ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
@@ -1060,21 +1058,6 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
}
|
||||
}
|
||||
|
||||
Some(("set-state", subcommand_args)) => {
|
||||
let pageserver = get_pageserver(env, subcommand_args)?;
|
||||
let scheduling = subcommand_args.get_one("scheduling");
|
||||
let availability = subcommand_args.get_one("availability");
|
||||
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.node_configure(NodeConfigureRequest {
|
||||
node_id: pageserver.conf.id,
|
||||
scheduling: scheduling.cloned(),
|
||||
availability: availability.cloned(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
Some(("status", subcommand_args)) => {
|
||||
match get_pageserver(env, subcommand_args)?.check_status().await {
|
||||
Ok(_) => println!("Page server is up and running"),
|
||||
@@ -1515,12 +1498,6 @@ fn cli() -> Command {
|
||||
.about("Restart local pageserver")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.subcommand(Command::new("set-state")
|
||||
.arg(Arg::new("availability").value_parser(value_parser!(NodeAvailability)).long("availability").action(ArgAction::Set).help("Availability state: offline,active"))
|
||||
.arg(Arg::new("scheduling").value_parser(value_parser!(NodeSchedulingPolicy)).long("scheduling").action(ArgAction::Set).help("Scheduling state: draining,pause,filling,active"))
|
||||
.about("Set scheduling or availability state of pageserver node")
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("storage_controller")
|
||||
|
||||
23
control_plane/storcon_cli/Cargo.toml
Normal file
23
control_plane/storcon_cli/Cargo.toml
Normal file
@@ -0,0 +1,23 @@
|
||||
[package]
|
||||
name = "storcon_cli"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
clap.workspace = true
|
||||
comfy-table.workspace = true
|
||||
hyper.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json = { workspace = true, features = ["raw_value"] }
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
587
control_plane/storcon_cli/src/main.rs
Normal file
587
control_plane/storcon_cli/src/main.rs
Normal file
@@ -0,0 +1,587 @@
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
|
||||
use clap::{Parser, Subcommand};
|
||||
use hyper::Method;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy,
|
||||
TenantDescribeResponse, TenantPolicyRequest,
|
||||
},
|
||||
models::{
|
||||
ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest,
|
||||
TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
},
|
||||
shard::{ShardStripeSize, TenantShardId},
|
||||
};
|
||||
use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt};
|
||||
use reqwest::Url;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use utils::id::{NodeId, TenantId};
|
||||
|
||||
use pageserver_api::controller_api::{
|
||||
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
};
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
enum Command {
|
||||
/// Register a pageserver with the storage controller. This shouldn't usually be necessary,
|
||||
/// since pageservers auto-register when they start up
|
||||
NodeRegister {
|
||||
#[arg(long)]
|
||||
node_id: NodeId,
|
||||
|
||||
#[arg(long)]
|
||||
listen_pg_addr: String,
|
||||
#[arg(long)]
|
||||
listen_pg_port: u16,
|
||||
|
||||
#[arg(long)]
|
||||
listen_http_addr: String,
|
||||
#[arg(long)]
|
||||
listen_http_port: u16,
|
||||
},
|
||||
|
||||
/// Modify a node's configuration in the storage controller
|
||||
NodeConfigure {
|
||||
#[arg(long)]
|
||||
node_id: NodeId,
|
||||
|
||||
/// Availability is usually auto-detected based on heartbeats. Set 'offline' here to
|
||||
/// manually mark a node offline
|
||||
#[arg(long)]
|
||||
availability: Option<NodeAvailabilityArg>,
|
||||
/// Scheduling policy controls whether tenant shards may be scheduled onto this node.
|
||||
#[arg(long)]
|
||||
scheduling: Option<NodeSchedulingPolicy>,
|
||||
},
|
||||
/// Modify a tenant's policies in the storage controller
|
||||
TenantPolicy {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
/// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
|
||||
/// or is in the normal attached state with N secondary locations (`attached:N`)
|
||||
#[arg(long)]
|
||||
placement: Option<PlacementPolicyArg>,
|
||||
/// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal,
|
||||
/// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
|
||||
/// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant
|
||||
/// unavailable, and are only for use in emergencies.
|
||||
#[arg(long)]
|
||||
scheduling: Option<ShardSchedulingPolicyArg>,
|
||||
},
|
||||
/// List nodes known to the storage controller
|
||||
Nodes {},
|
||||
/// List tenants known to the storage controller
|
||||
Tenants {},
|
||||
/// Create a new tenant in the storage controller, and by extension on pageservers.
|
||||
TenantCreate {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
},
|
||||
/// Delete a tenant in the storage controller, and by extension on pageservers.
|
||||
TenantDelete {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
},
|
||||
/// Split an existing tenant into a higher number of shards than its current shard count.
|
||||
TenantShardSplit {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
#[arg(long)]
|
||||
shard_count: u8,
|
||||
/// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes.
|
||||
#[arg(long)]
|
||||
stripe_size: Option<u32>,
|
||||
},
|
||||
/// Migrate the attached location for a tenant shard to a specific pageserver.
|
||||
TenantShardMigrate {
|
||||
#[arg(long)]
|
||||
tenant_shard_id: TenantShardId,
|
||||
#[arg(long)]
|
||||
node: NodeId,
|
||||
},
|
||||
/// Modify the pageserver tenant configuration of a tenant: this is the configuration structure
|
||||
/// that is passed through to pageservers, and does not affect storage controller behavior.
|
||||
TenantConfig {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
#[arg(long)]
|
||||
config: String,
|
||||
},
|
||||
/// Attempt to balance the locations for a tenant across pageservers. This is a client-side
|
||||
/// alternative to the storage controller's scheduling optimization behavior.
|
||||
TenantScatter {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
},
|
||||
/// Print details about a particular tenant, including all its shards' states.
|
||||
TenantDescribe {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(
|
||||
author,
|
||||
version,
|
||||
about,
|
||||
long_about = "CLI for Storage Controller Support/Debug"
|
||||
)]
|
||||
#[command(arg_required_else_help(true))]
|
||||
struct Cli {
|
||||
#[arg(long)]
|
||||
/// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
|
||||
api: Url,
|
||||
|
||||
#[arg(long)]
|
||||
/// JWT token for authenticating with storage controller. Depending on the API used, this
|
||||
/// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
|
||||
/// a token with both scopes to use with this tool.
|
||||
jwt: Option<String>,
|
||||
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct PlacementPolicyArg(PlacementPolicy);
|
||||
|
||||
impl FromStr for PlacementPolicyArg {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"detached" => Ok(Self(PlacementPolicy::Detached)),
|
||||
"secondary" => Ok(Self(PlacementPolicy::Secondary)),
|
||||
_ if s.starts_with("attached:") => {
|
||||
let mut splitter = s.split(':');
|
||||
let _prefix = splitter.next().unwrap();
|
||||
match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
|
||||
Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
|
||||
None => Err(anyhow::anyhow!(
|
||||
"Invalid format '{s}', a valid example is 'attached:1'"
|
||||
)),
|
||||
}
|
||||
}
|
||||
_ => Err(anyhow::anyhow!(
|
||||
"Unknown placement policy '{s}', try detached,secondary,attached:<n>"
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
|
||||
|
||||
impl FromStr for ShardSchedulingPolicyArg {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"active" => Ok(Self(ShardSchedulingPolicy::Active)),
|
||||
"essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
|
||||
"pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
|
||||
"stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
|
||||
_ => Err(anyhow::anyhow!(
|
||||
"Unknown scheduling policy '{s}', try active,essential,pause,stop"
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct NodeAvailabilityArg(NodeAvailabilityWrapper);
|
||||
|
||||
impl FromStr for NodeAvailabilityArg {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"active" => Ok(Self(NodeAvailabilityWrapper::Active)),
|
||||
"offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
|
||||
_ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Client {
|
||||
base_url: Url,
|
||||
jwt_token: Option<String>,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
fn new(base_url: Url, jwt_token: Option<String>) -> Self {
|
||||
Self {
|
||||
base_url,
|
||||
jwt_token,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
.expect("Failed to construct http client"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple HTTP request wrapper for calling into attachment service
|
||||
async fn dispatch<RQ, RS>(
|
||||
&self,
|
||||
method: hyper::Method,
|
||||
path: String,
|
||||
body: Option<RQ>,
|
||||
) -> mgmt_api::Result<RS>
|
||||
where
|
||||
RQ: Serialize + Sized,
|
||||
RS: DeserializeOwned + Sized,
|
||||
{
|
||||
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
|
||||
// for general purpose API access.
|
||||
let url = Url::from_str(&format!(
|
||||
"http://{}:{}/{path}",
|
||||
self.base_url.host_str().unwrap(),
|
||||
self.base_url.port().unwrap()
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut builder = self.client.request(method, url);
|
||||
if let Some(body) = body {
|
||||
builder = builder.json(&body)
|
||||
}
|
||||
if let Some(jwt_token) = &self.jwt_token {
|
||||
builder = builder.header(
|
||||
reqwest::header::AUTHORIZATION,
|
||||
format!("Bearer {jwt_token}"),
|
||||
);
|
||||
}
|
||||
|
||||
let response = builder.send().await.map_err(mgmt_api::Error::ReceiveBody)?;
|
||||
let response = response.error_from_body().await?;
|
||||
|
||||
response
|
||||
.json()
|
||||
.await
|
||||
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
|
||||
|
||||
let mut trimmed = cli.api.to_string();
|
||||
trimmed.pop();
|
||||
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
|
||||
|
||||
match cli.command {
|
||||
Command::NodeRegister {
|
||||
node_id,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
} => {
|
||||
storcon_client
|
||||
.dispatch::<_, ()>(
|
||||
Method::POST,
|
||||
"control/v1/node".to_string(),
|
||||
Some(NodeRegisterRequest {
|
||||
node_id,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TenantCreate { tenant_id } => {
|
||||
vps_client
|
||||
.tenant_create(&TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation: None,
|
||||
shard_parameters: ShardParameters::default(),
|
||||
placement_policy: Some(PlacementPolicy::Attached(1)),
|
||||
config: TenantConfig::default(),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
Command::TenantDelete { tenant_id } => {
|
||||
let status = vps_client
|
||||
.tenant_delete(TenantShardId::unsharded(tenant_id))
|
||||
.await?;
|
||||
tracing::info!("Delete status: {}", status);
|
||||
}
|
||||
Command::Nodes {} => {
|
||||
let resp = storcon_client
|
||||
.dispatch::<(), Vec<NodeDescribeResponse>>(
|
||||
Method::GET,
|
||||
"control/v1/node".to_string(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let mut table = comfy_table::Table::new();
|
||||
table.set_header(["Id", "Hostname", "Scheduling", "Availability"]);
|
||||
for node in resp {
|
||||
table.add_row([
|
||||
format!("{}", node.id),
|
||||
node.listen_http_addr,
|
||||
format!("{:?}", node.scheduling),
|
||||
format!("{:?}", node.availability),
|
||||
]);
|
||||
}
|
||||
println!("{table}");
|
||||
}
|
||||
Command::NodeConfigure {
|
||||
node_id,
|
||||
availability,
|
||||
scheduling,
|
||||
} => {
|
||||
let req = NodeConfigureRequest {
|
||||
node_id,
|
||||
availability: availability.map(|a| a.0),
|
||||
scheduling,
|
||||
};
|
||||
storcon_client
|
||||
.dispatch::<_, ()>(
|
||||
Method::PUT,
|
||||
format!("control/v1/node/{node_id}/config"),
|
||||
Some(req),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::Tenants {} => {
|
||||
let resp = storcon_client
|
||||
.dispatch::<(), Vec<TenantDescribeResponse>>(
|
||||
Method::GET,
|
||||
"control/v1/tenant".to_string(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let mut table = comfy_table::Table::new();
|
||||
table.set_header([
|
||||
"TenantId",
|
||||
"ShardCount",
|
||||
"StripeSize",
|
||||
"Placement",
|
||||
"Scheduling",
|
||||
]);
|
||||
for tenant in resp {
|
||||
let shard_zero = tenant.shards.into_iter().next().unwrap();
|
||||
table.add_row([
|
||||
format!("{}", tenant.tenant_id),
|
||||
format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
|
||||
format!("{:?}", tenant.stripe_size),
|
||||
format!("{:?}", tenant.policy),
|
||||
format!("{:?}", shard_zero.scheduling_policy),
|
||||
]);
|
||||
}
|
||||
|
||||
println!("{table}");
|
||||
}
|
||||
Command::TenantPolicy {
|
||||
tenant_id,
|
||||
placement,
|
||||
scheduling,
|
||||
} => {
|
||||
let req = TenantPolicyRequest {
|
||||
scheduling: scheduling.map(|s| s.0),
|
||||
placement: placement.map(|p| p.0),
|
||||
};
|
||||
storcon_client
|
||||
.dispatch::<_, ()>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_id}/policy"),
|
||||
Some(req),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TenantShardSplit {
|
||||
tenant_id,
|
||||
shard_count,
|
||||
stripe_size,
|
||||
} => {
|
||||
let req = TenantShardSplitRequest {
|
||||
new_shard_count: shard_count,
|
||||
new_stripe_size: stripe_size.map(ShardStripeSize),
|
||||
};
|
||||
|
||||
let response = storcon_client
|
||||
.dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_id}/shard_split"),
|
||||
Some(req),
|
||||
)
|
||||
.await?;
|
||||
println!(
|
||||
"Split tenant {} into {} shards: {}",
|
||||
tenant_id,
|
||||
shard_count,
|
||||
response
|
||||
.new_shards
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
}
|
||||
Command::TenantShardMigrate {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest {
|
||||
tenant_shard_id,
|
||||
node_id: node,
|
||||
};
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
|
||||
Some(req),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TenantConfig { tenant_id, config } => {
|
||||
let tenant_conf = serde_json::from_str(&config)?;
|
||||
|
||||
vps_client
|
||||
.tenant_config(&TenantConfigRequest {
|
||||
tenant_id,
|
||||
config: tenant_conf,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
Command::TenantScatter { tenant_id } => {
|
||||
// Find the shards
|
||||
let locate_response = storcon_client
|
||||
.dispatch::<(), TenantLocateResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/tenant/{tenant_id}/locate"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let shards = locate_response.shards;
|
||||
|
||||
let mut node_to_shards: HashMap<NodeId, Vec<TenantShardId>> = HashMap::new();
|
||||
let shard_count = shards.len();
|
||||
for s in shards {
|
||||
let entry = node_to_shards.entry(s.node_id).or_default();
|
||||
entry.push(s.shard_id);
|
||||
}
|
||||
|
||||
// Load list of available nodes
|
||||
let nodes_resp = storcon_client
|
||||
.dispatch::<(), Vec<NodeDescribeResponse>>(
|
||||
Method::GET,
|
||||
"control/v1/node".to_string(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for node in nodes_resp {
|
||||
if matches!(node.availability, NodeAvailabilityWrapper::Active) {
|
||||
node_to_shards.entry(node.id).or_default();
|
||||
}
|
||||
}
|
||||
|
||||
let max_shard_per_node = shard_count / node_to_shards.len();
|
||||
|
||||
loop {
|
||||
let mut migrate_shard = None;
|
||||
for shards in node_to_shards.values_mut() {
|
||||
if shards.len() > max_shard_per_node {
|
||||
// Pick the emptiest
|
||||
migrate_shard = Some(shards.pop().unwrap());
|
||||
}
|
||||
}
|
||||
let Some(migrate_shard) = migrate_shard else {
|
||||
break;
|
||||
};
|
||||
|
||||
// Pick the emptiest node to migrate to
|
||||
let mut destinations = node_to_shards
|
||||
.iter()
|
||||
.map(|(k, v)| (k, v.len()))
|
||||
.collect::<Vec<_>>();
|
||||
destinations.sort_by_key(|i| i.1);
|
||||
let (destination_node, destination_count) = *destinations.first().unwrap();
|
||||
if destination_count + 1 > max_shard_per_node {
|
||||
// Even the emptiest destination doesn't have space: we're done
|
||||
break;
|
||||
}
|
||||
let destination_node = *destination_node;
|
||||
|
||||
node_to_shards
|
||||
.get_mut(&destination_node)
|
||||
.unwrap()
|
||||
.push(migrate_shard);
|
||||
|
||||
println!("Migrate {} -> {} ...", migrate_shard, destination_node);
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{migrate_shard}/migrate"),
|
||||
Some(TenantShardMigrateRequest {
|
||||
tenant_shard_id: migrate_shard,
|
||||
node_id: destination_node,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
println!("Migrate {} -> {} OK", migrate_shard, destination_node);
|
||||
}
|
||||
|
||||
// Spread the shards across the nodes
|
||||
}
|
||||
Command::TenantDescribe { tenant_id } => {
|
||||
let describe_response = storcon_client
|
||||
.dispatch::<(), TenantDescribeResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/tenant/{tenant_id}"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let shards = describe_response.shards;
|
||||
let mut table = comfy_table::Table::new();
|
||||
table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]);
|
||||
for shard in shards {
|
||||
let secondary = shard
|
||||
.node_secondary
|
||||
.iter()
|
||||
.map(|n| format!("{}", n))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
let mut status_parts = Vec::new();
|
||||
if shard.is_reconciling {
|
||||
status_parts.push("reconciling");
|
||||
}
|
||||
|
||||
if shard.is_pending_compute_notification {
|
||||
status_parts.push("pending_compute");
|
||||
}
|
||||
|
||||
if shard.is_splitting {
|
||||
status_parts.push("splitting");
|
||||
}
|
||||
let status = status_parts.join(",");
|
||||
|
||||
table.add_row([
|
||||
format!("{}", shard.tenant_shard_id),
|
||||
shard
|
||||
.node_attached
|
||||
.map(|n| format!("{}", n))
|
||||
.unwrap_or(String::new()),
|
||||
secondary,
|
||||
shard.last_error,
|
||||
status,
|
||||
]);
|
||||
}
|
||||
println!("{table}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -4,7 +4,7 @@ use std::str::FromStr;
|
||||
/// API (`/control/v1` prefix). Implemented by the server
|
||||
/// in [`attachment_service::http`]
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::NodeId;
|
||||
use utils::id::{NodeId, TenantId};
|
||||
|
||||
use crate::{
|
||||
models::{ShardParameters, TenantConfig},
|
||||
@@ -68,12 +68,27 @@ pub struct TenantLocateResponse {
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantDescribeResponse {
|
||||
pub tenant_id: TenantId,
|
||||
pub shards: Vec<TenantDescribeResponseShard>,
|
||||
pub stripe_size: ShardStripeSize,
|
||||
pub policy: PlacementPolicy,
|
||||
pub config: TenantConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct NodeDescribeResponse {
|
||||
pub id: NodeId,
|
||||
|
||||
pub availability: NodeAvailabilityWrapper,
|
||||
pub scheduling: NodeSchedulingPolicy,
|
||||
|
||||
pub listen_http_addr: String,
|
||||
pub listen_http_port: u16,
|
||||
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_pg_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantDescribeResponseShard {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
@@ -89,6 +104,8 @@ pub struct TenantDescribeResponseShard {
|
||||
pub is_pending_compute_notification: bool,
|
||||
/// A shard split is currently underway
|
||||
pub is_splitting: bool,
|
||||
|
||||
pub scheduling_policy: ShardSchedulingPolicy,
|
||||
}
|
||||
|
||||
/// Explicitly migrating a particular shard is a low level operation
|
||||
@@ -103,7 +120,7 @@ pub struct TenantShardMigrateRequest {
|
||||
/// Utilisation score indicating how good a candidate a pageserver
|
||||
/// is for scheduling the next tenant. See [`crate::models::PageserverUtilization`].
|
||||
/// Lower values are better.
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug)]
|
||||
pub struct UtilizationScore(pub u64);
|
||||
|
||||
impl UtilizationScore {
|
||||
@@ -112,7 +129,7 @@ impl UtilizationScore {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone, Copy)]
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
|
||||
#[serde(into = "NodeAvailabilityWrapper")]
|
||||
pub enum NodeAvailability {
|
||||
// Normal, happy state
|
||||
@@ -135,7 +152,7 @@ impl Eq for NodeAvailability {}
|
||||
// This wrapper provides serde functionality and it should only be used to
|
||||
// communicate with external callers which don't know or care about the
|
||||
// utilisation score of the pageserver it is targeting.
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
|
||||
pub enum NodeAvailabilityWrapper {
|
||||
Active,
|
||||
Offline,
|
||||
@@ -161,21 +178,6 @@ impl From<NodeAvailability> for NodeAvailabilityWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for NodeAvailability {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
// This is used when parsing node configuration requests from neon-local.
|
||||
// Assume the worst possible utilisation score
|
||||
// and let it get updated via the heartbeats.
|
||||
"active" => Ok(Self::Active(UtilizationScore::worst())),
|
||||
"offline" => Ok(Self::Offline),
|
||||
_ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
||||
pub enum ShardSchedulingPolicy {
|
||||
// Normal mode: the tenant's scheduled locations may be updated at will, including
|
||||
@@ -202,7 +204,7 @@ impl Default for ShardSchedulingPolicy {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
|
||||
pub enum NodeSchedulingPolicy {
|
||||
Active,
|
||||
Filling,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
@@ -24,7 +25,7 @@ from fixtures.pageserver.utils import (
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
ObjectTypeDef,
|
||||
)
|
||||
@@ -1131,3 +1132,89 @@ def test_storage_controller_shard_scheduling_policy(neon_env_builder: NeonEnvBui
|
||||
|
||||
# And indeed the tenant should be attached
|
||||
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
|
||||
|
||||
|
||||
def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
The storage controller command line interface (storcon-cli) is an internal tool. Most tests
|
||||
just use the APIs directly: this test exercises some basics of the CLI as a regression test
|
||||
that the client remains usable as the server evolves.
|
||||
"""
|
||||
output_dir = neon_env_builder.test_output_dir
|
||||
shard_count = 4
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||
base_args = [env.neon_binpath / "storcon_cli", "--api", env.storage_controller_api]
|
||||
|
||||
def storcon_cli(args):
|
||||
"""
|
||||
CLI wrapper: returns stdout split into a list of non-empty strings
|
||||
"""
|
||||
(output_path, stdout, status_code) = subprocess_capture(
|
||||
output_dir,
|
||||
[str(s) for s in base_args + args],
|
||||
echo_stderr=True,
|
||||
echo_stdout=True,
|
||||
env={},
|
||||
check=False,
|
||||
capture_stdout=True,
|
||||
timeout=10,
|
||||
)
|
||||
if status_code:
|
||||
log.warning(f"Command {args} failed")
|
||||
log.warning(f"Output at: {output_path}")
|
||||
|
||||
raise RuntimeError("CLI failure (check logs for stderr)")
|
||||
|
||||
assert stdout is not None
|
||||
return [line.strip() for line in stdout.split("\n") if line.strip()]
|
||||
|
||||
# List nodes
|
||||
node_lines = storcon_cli(["nodes"])
|
||||
# Table header, footer, and one line of data
|
||||
assert len(node_lines) == 5
|
||||
assert "localhost" in node_lines[3]
|
||||
|
||||
# Pause scheduling onto a node
|
||||
storcon_cli(["node-configure", "--node-id", "1", "--scheduling", "pause"])
|
||||
assert "Pause" in storcon_cli(["nodes"])[3]
|
||||
|
||||
# Make a node offline
|
||||
storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"])
|
||||
assert "Offline" in storcon_cli(["nodes"])[3]
|
||||
|
||||
# List tenants
|
||||
tenant_lines = storcon_cli(["tenants"])
|
||||
assert len(tenant_lines) == 5
|
||||
assert str(env.initial_tenant) in tenant_lines[3]
|
||||
|
||||
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy.*")
|
||||
|
||||
# Describe a tenant
|
||||
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)])
|
||||
assert len(tenant_lines) == 3 + shard_count * 2
|
||||
assert str(env.initial_tenant) in tenant_lines[3]
|
||||
|
||||
# Pause changes on a tenant
|
||||
storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"])
|
||||
assert "Stop" in storcon_cli(["tenants"])[3]
|
||||
|
||||
# Change a tenant's placement
|
||||
storcon_cli(
|
||||
["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"]
|
||||
)
|
||||
assert "Secondary" in storcon_cli(["tenants"])[3]
|
||||
|
||||
# Modify a tenant's config
|
||||
storcon_cli(
|
||||
[
|
||||
"tenant-config",
|
||||
"--tenant-id",
|
||||
str(env.initial_tenant),
|
||||
"--config",
|
||||
json.dumps({"pitr_interval": "1m"}),
|
||||
]
|
||||
)
|
||||
|
||||
# Quiesce any background reconciliation before doing consistency check
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=10)
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
Reference in New Issue
Block a user