diff --git a/Cargo.lock b/Cargo.lock index 92c07b0c6f..ecc69f7048 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -288,6 +288,7 @@ dependencies = [ "hex", "humantime", "hyper", + "itertools", "lasso", "measured", "metrics", @@ -5622,6 +5623,26 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "storcon_cli" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "comfy-table", + "hyper", + "pageserver_api", + "pageserver_client", + "reqwest", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "stringprep" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 309ebbe119..9f24176c65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "compute_tools", "control_plane", "control_plane/attachment_service", + "control_plane/storcon_cli", "pageserver", "pageserver/compaction", "pageserver/ctl", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index 0201e0ed86..595b091df4 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -25,6 +25,7 @@ git-version.workspace = true hex.workspace = true hyper.workspace = true humantime.workspace = true +itertools.workspace = true lasso.workspace = true once_cell.workspace = true pageserver_api.workspace = true diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 1f3f78bffa..03883f0ca2 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -399,6 +399,15 @@ async fn handle_tenant_describe( json_response(StatusCode::OK, service.tenant_describe(tenant_id)?) } +async fn handle_tenant_list( + service: Arc, + req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + json_response(StatusCode::OK, service.tenant_list()) +} + async fn handle_node_register(mut req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; @@ -412,7 +421,10 @@ async fn handle_node_list(req: Request) -> Result, ApiError check_permissions(&req, Scope::Admin)?; let state = get_state(&req); - json_response(StatusCode::OK, state.service.node_list().await?) + let nodes = state.service.node_list().await?; + let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::>(); + + json_response(StatusCode::OK, api_nodes) } async fn handle_node_drop(req: Request) -> Result, ApiError> { @@ -793,6 +805,9 @@ pub fn make_router( RequestName("control_v1_tenant_describe"), ) }) + .get("/control/v1/tenant", |r| { + tenant_service_handler(r, handle_tenant_list, RequestName("control_v1_tenant_list")) + }) .put("/control/v1/tenant/:tenant_id/policy", |r| { named_request_span( r, diff --git a/control_plane/attachment_service/src/node.rs b/control_plane/attachment_service/src/node.rs index df40bff66f..7ba6828deb 100644 --- a/control_plane/attachment_service/src/node.rs +++ b/control_plane/attachment_service/src/node.rs @@ -3,7 +3,8 @@ use std::{str::FromStr, time::Duration}; use hyper::StatusCode; use pageserver_api::{ controller_api::{ - NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, TenantLocateResponseShard, + NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy, + TenantLocateResponseShard, }, shard::TenantShardId, }; @@ -256,6 +257,19 @@ impl Node { ) .await } + + /// Generate the simplified API-friendly description of a node's state + pub(crate) fn describe(&self) -> NodeDescribeResponse { + NodeDescribeResponse { + id: self.id, + availability: self.availability.into(), + scheduling: self.scheduling, + listen_http_addr: self.listen_http_addr.clone(), + listen_http_port: self.listen_http_port, + listen_pg_addr: self.listen_pg_addr.clone(), + listen_pg_port: self.listen_pg_port, + } + } } impl std::fmt::Display for Node { diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 7502d9d186..0b67e30b96 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -20,6 +20,7 @@ use control_plane::storage_controller::{ use diesel::result::DatabaseErrorKind; use futures::{stream::FuturesUnordered, StreamExt}; use hyper::StatusCode; +use itertools::Itertools; use pageserver_api::{ controller_api::{ NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy, @@ -2735,47 +2736,73 @@ impl Service { }) } - pub(crate) fn tenant_describe( + /// Returns None if the input iterator of shards does not include a shard with number=0 + fn tenant_describe_impl<'a>( &self, - tenant_id: TenantId, - ) -> Result { - let locked = self.inner.read().unwrap(); - + shards: impl Iterator, + ) -> Option { let mut shard_zero = None; - let mut shards = Vec::new(); + let mut describe_shards = Vec::new(); - for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) - { - if tenant_shard_id.is_zero() { + for shard in shards { + if shard.tenant_shard_id.is_zero() { shard_zero = Some(shard); } - let response_shard = TenantDescribeResponseShard { - tenant_shard_id: *tenant_shard_id, + describe_shards.push(TenantDescribeResponseShard { + tenant_shard_id: shard.tenant_shard_id, node_attached: *shard.intent.get_attached(), node_secondary: shard.intent.get_secondary().to_vec(), last_error: shard.last_error.lock().unwrap().clone(), is_reconciling: shard.reconciler.is_some(), is_pending_compute_notification: shard.pending_compute_notification, is_splitting: matches!(shard.splitting, SplitState::Splitting), - }; - shards.push(response_shard); + scheduling_policy: *shard.get_scheduling_policy(), + }) } - let Some(shard_zero) = shard_zero else { - return Err(ApiError::NotFound( - anyhow::anyhow!("Tenant {tenant_id} not found").into(), - )); - }; + let shard_zero = shard_zero?; - Ok(TenantDescribeResponse { - shards, + Some(TenantDescribeResponse { + tenant_id: shard_zero.tenant_shard_id.tenant_id, + shards: describe_shards, stripe_size: shard_zero.shard.stripe_size, policy: shard_zero.policy.clone(), config: shard_zero.config.clone(), }) } + pub(crate) fn tenant_describe( + &self, + tenant_id: TenantId, + ) -> Result { + let locked = self.inner.read().unwrap(); + + self.tenant_describe_impl( + locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .map(|(_k, v)| v), + ) + .ok_or_else(|| ApiError::NotFound(anyhow::anyhow!("Tenant {tenant_id} not found").into())) + } + + pub(crate) fn tenant_list(&self) -> Vec { + let locked = self.inner.read().unwrap(); + + let mut result = Vec::new(); + for (_tenant_id, tenant_shards) in + &locked.tenants.iter().group_by(|(id, _shard)| id.tenant_id) + { + result.push( + self.tenant_describe_impl(tenant_shards.map(|(_k, v)| v)) + .expect("Groups are always non-empty"), + ); + } + + result + } + #[instrument(skip_all, fields(tenant_id=%op.tenant_id))] async fn abort_tenant_shard_split( &self, diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 401feae706..56495dd2da 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -14,9 +14,7 @@ use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR}; use control_plane::safekeeper::SafekeeperNode; use control_plane::storage_controller::StorageController; use control_plane::{broker, local_env}; -use pageserver_api::controller_api::{ - NodeAvailability, NodeConfigureRequest, NodeSchedulingPolicy, PlacementPolicy, -}; +use pageserver_api::controller_api::PlacementPolicy; use pageserver_api::models::{ ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo, }; @@ -1060,21 +1058,6 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> } } - Some(("set-state", subcommand_args)) => { - let pageserver = get_pageserver(env, subcommand_args)?; - let scheduling = subcommand_args.get_one("scheduling"); - let availability = subcommand_args.get_one("availability"); - - let storage_controller = StorageController::from_env(env); - storage_controller - .node_configure(NodeConfigureRequest { - node_id: pageserver.conf.id, - scheduling: scheduling.cloned(), - availability: availability.cloned(), - }) - .await?; - } - Some(("status", subcommand_args)) => { match get_pageserver(env, subcommand_args)?.check_status().await { Ok(_) => println!("Page server is up and running"), @@ -1515,12 +1498,6 @@ fn cli() -> Command { .about("Restart local pageserver") .arg(pageserver_config_args.clone()) ) - .subcommand(Command::new("set-state") - .arg(Arg::new("availability").value_parser(value_parser!(NodeAvailability)).long("availability").action(ArgAction::Set).help("Availability state: offline,active")) - .arg(Arg::new("scheduling").value_parser(value_parser!(NodeSchedulingPolicy)).long("scheduling").action(ArgAction::Set).help("Scheduling state: draining,pause,filling,active")) - .about("Set scheduling or availability state of pageserver node") - .arg(pageserver_config_args.clone()) - ) ) .subcommand( Command::new("storage_controller") diff --git a/control_plane/storcon_cli/Cargo.toml b/control_plane/storcon_cli/Cargo.toml new file mode 100644 index 0000000000..61eb7fa4e4 --- /dev/null +++ b/control_plane/storcon_cli/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "storcon_cli" +version = "0.1.0" +edition.workspace = true +license.workspace = true + + +[dependencies] +anyhow.workspace = true +clap.workspace = true +comfy-table.workspace = true +hyper.workspace = true +pageserver_api.workspace = true +pageserver_client.workspace = true +reqwest.workspace = true +serde.workspace = true +serde_json = { workspace = true, features = ["raw_value"] } +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +utils.workspace = true +workspace_hack.workspace = true + diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs new file mode 100644 index 0000000000..f72bc9a2a9 --- /dev/null +++ b/control_plane/storcon_cli/src/main.rs @@ -0,0 +1,587 @@ +use std::{collections::HashMap, str::FromStr}; + +use clap::{Parser, Subcommand}; +use hyper::Method; +use pageserver_api::{ + controller_api::{ + NodeAvailabilityWrapper, NodeDescribeResponse, ShardSchedulingPolicy, + TenantDescribeResponse, TenantPolicyRequest, + }, + models::{ + ShardParameters, TenantConfig, TenantConfigRequest, TenantCreateRequest, + TenantShardSplitRequest, TenantShardSplitResponse, + }, + shard::{ShardStripeSize, TenantShardId}, +}; +use pageserver_client::mgmt_api::{self, ResponseErrorMessageExt}; +use reqwest::Url; +use serde::{de::DeserializeOwned, Serialize}; +use utils::id::{NodeId, TenantId}; + +use pageserver_api::controller_api::{ + NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy, + TenantLocateResponse, TenantShardMigrateRequest, TenantShardMigrateResponse, +}; + +#[derive(Subcommand, Debug)] +enum Command { + /// Register a pageserver with the storage controller. This shouldn't usually be necessary, + /// since pageservers auto-register when they start up + NodeRegister { + #[arg(long)] + node_id: NodeId, + + #[arg(long)] + listen_pg_addr: String, + #[arg(long)] + listen_pg_port: u16, + + #[arg(long)] + listen_http_addr: String, + #[arg(long)] + listen_http_port: u16, + }, + + /// Modify a node's configuration in the storage controller + NodeConfigure { + #[arg(long)] + node_id: NodeId, + + /// Availability is usually auto-detected based on heartbeats. Set 'offline' here to + /// manually mark a node offline + #[arg(long)] + availability: Option, + /// Scheduling policy controls whether tenant shards may be scheduled onto this node. + #[arg(long)] + scheduling: Option, + }, + /// Modify a tenant's policies in the storage controller + TenantPolicy { + #[arg(long)] + tenant_id: TenantId, + /// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`), + /// or is in the normal attached state with N secondary locations (`attached:N`) + #[arg(long)] + placement: Option, + /// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal, + /// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents + /// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant + /// unavailable, and are only for use in emergencies. + #[arg(long)] + scheduling: Option, + }, + /// List nodes known to the storage controller + Nodes {}, + /// List tenants known to the storage controller + Tenants {}, + /// Create a new tenant in the storage controller, and by extension on pageservers. + TenantCreate { + #[arg(long)] + tenant_id: TenantId, + }, + /// Delete a tenant in the storage controller, and by extension on pageservers. + TenantDelete { + #[arg(long)] + tenant_id: TenantId, + }, + /// Split an existing tenant into a higher number of shards than its current shard count. + TenantShardSplit { + #[arg(long)] + tenant_id: TenantId, + #[arg(long)] + shard_count: u8, + /// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes. + #[arg(long)] + stripe_size: Option, + }, + /// Migrate the attached location for a tenant shard to a specific pageserver. + TenantShardMigrate { + #[arg(long)] + tenant_shard_id: TenantShardId, + #[arg(long)] + node: NodeId, + }, + /// Modify the pageserver tenant configuration of a tenant: this is the configuration structure + /// that is passed through to pageservers, and does not affect storage controller behavior. + TenantConfig { + #[arg(long)] + tenant_id: TenantId, + #[arg(long)] + config: String, + }, + /// Attempt to balance the locations for a tenant across pageservers. This is a client-side + /// alternative to the storage controller's scheduling optimization behavior. + TenantScatter { + #[arg(long)] + tenant_id: TenantId, + }, + /// Print details about a particular tenant, including all its shards' states. + TenantDescribe { + #[arg(long)] + tenant_id: TenantId, + }, +} + +#[derive(Parser)] +#[command( + author, + version, + about, + long_about = "CLI for Storage Controller Support/Debug" +)] +#[command(arg_required_else_help(true))] +struct Cli { + #[arg(long)] + /// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local` + api: Url, + + #[arg(long)] + /// JWT token for authenticating with storage controller. Depending on the API used, this + /// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint + /// a token with both scopes to use with this tool. + jwt: Option, + + #[command(subcommand)] + command: Command, +} + +#[derive(Debug, Clone)] +struct PlacementPolicyArg(PlacementPolicy); + +impl FromStr for PlacementPolicyArg { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "detached" => Ok(Self(PlacementPolicy::Detached)), + "secondary" => Ok(Self(PlacementPolicy::Secondary)), + _ if s.starts_with("attached:") => { + let mut splitter = s.split(':'); + let _prefix = splitter.next().unwrap(); + match splitter.next().and_then(|s| s.parse::().ok()) { + Some(n) => Ok(Self(PlacementPolicy::Attached(n))), + None => Err(anyhow::anyhow!( + "Invalid format '{s}', a valid example is 'attached:1'" + )), + } + } + _ => Err(anyhow::anyhow!( + "Unknown placement policy '{s}', try detached,secondary,attached:" + )), + } + } +} + +#[derive(Debug, Clone)] +struct ShardSchedulingPolicyArg(ShardSchedulingPolicy); + +impl FromStr for ShardSchedulingPolicyArg { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "active" => Ok(Self(ShardSchedulingPolicy::Active)), + "essential" => Ok(Self(ShardSchedulingPolicy::Essential)), + "pause" => Ok(Self(ShardSchedulingPolicy::Pause)), + "stop" => Ok(Self(ShardSchedulingPolicy::Stop)), + _ => Err(anyhow::anyhow!( + "Unknown scheduling policy '{s}', try active,essential,pause,stop" + )), + } + } +} + +#[derive(Debug, Clone)] +struct NodeAvailabilityArg(NodeAvailabilityWrapper); + +impl FromStr for NodeAvailabilityArg { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "active" => Ok(Self(NodeAvailabilityWrapper::Active)), + "offline" => Ok(Self(NodeAvailabilityWrapper::Offline)), + _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")), + } + } +} + +struct Client { + base_url: Url, + jwt_token: Option, + client: reqwest::Client, +} + +impl Client { + fn new(base_url: Url, jwt_token: Option) -> Self { + Self { + base_url, + jwt_token, + client: reqwest::ClientBuilder::new() + .build() + .expect("Failed to construct http client"), + } + } + + /// Simple HTTP request wrapper for calling into attachment service + async fn dispatch( + &self, + method: hyper::Method, + path: String, + body: Option, + ) -> mgmt_api::Result + where + RQ: Serialize + Sized, + RS: DeserializeOwned + Sized, + { + // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out + // for general purpose API access. + let url = Url::from_str(&format!( + "http://{}:{}/{path}", + self.base_url.host_str().unwrap(), + self.base_url.port().unwrap() + )) + .unwrap(); + + let mut builder = self.client.request(method, url); + if let Some(body) = body { + builder = builder.json(&body) + } + if let Some(jwt_token) = &self.jwt_token { + builder = builder.header( + reqwest::header::AUTHORIZATION, + format!("Bearer {jwt_token}"), + ); + } + + let response = builder.send().await.map_err(mgmt_api::Error::ReceiveBody)?; + let response = response.error_from_body().await?; + + response + .json() + .await + .map_err(pageserver_client::mgmt_api::Error::ReceiveBody) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let cli = Cli::parse(); + + let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone()); + + let mut trimmed = cli.api.to_string(); + trimmed.pop(); + let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref()); + + match cli.command { + Command::NodeRegister { + node_id, + listen_pg_addr, + listen_pg_port, + listen_http_addr, + listen_http_port, + } => { + storcon_client + .dispatch::<_, ()>( + Method::POST, + "control/v1/node".to_string(), + Some(NodeRegisterRequest { + node_id, + listen_pg_addr, + listen_pg_port, + listen_http_addr, + listen_http_port, + }), + ) + .await?; + } + Command::TenantCreate { tenant_id } => { + vps_client + .tenant_create(&TenantCreateRequest { + new_tenant_id: TenantShardId::unsharded(tenant_id), + generation: None, + shard_parameters: ShardParameters::default(), + placement_policy: Some(PlacementPolicy::Attached(1)), + config: TenantConfig::default(), + }) + .await?; + } + Command::TenantDelete { tenant_id } => { + let status = vps_client + .tenant_delete(TenantShardId::unsharded(tenant_id)) + .await?; + tracing::info!("Delete status: {}", status); + } + Command::Nodes {} => { + let resp = storcon_client + .dispatch::<(), Vec>( + Method::GET, + "control/v1/node".to_string(), + None, + ) + .await?; + let mut table = comfy_table::Table::new(); + table.set_header(["Id", "Hostname", "Scheduling", "Availability"]); + for node in resp { + table.add_row([ + format!("{}", node.id), + node.listen_http_addr, + format!("{:?}", node.scheduling), + format!("{:?}", node.availability), + ]); + } + println!("{table}"); + } + Command::NodeConfigure { + node_id, + availability, + scheduling, + } => { + let req = NodeConfigureRequest { + node_id, + availability: availability.map(|a| a.0), + scheduling, + }; + storcon_client + .dispatch::<_, ()>( + Method::PUT, + format!("control/v1/node/{node_id}/config"), + Some(req), + ) + .await?; + } + Command::Tenants {} => { + let resp = storcon_client + .dispatch::<(), Vec>( + Method::GET, + "control/v1/tenant".to_string(), + None, + ) + .await?; + let mut table = comfy_table::Table::new(); + table.set_header([ + "TenantId", + "ShardCount", + "StripeSize", + "Placement", + "Scheduling", + ]); + for tenant in resp { + let shard_zero = tenant.shards.into_iter().next().unwrap(); + table.add_row([ + format!("{}", tenant.tenant_id), + format!("{}", shard_zero.tenant_shard_id.shard_count.literal()), + format!("{:?}", tenant.stripe_size), + format!("{:?}", tenant.policy), + format!("{:?}", shard_zero.scheduling_policy), + ]); + } + + println!("{table}"); + } + Command::TenantPolicy { + tenant_id, + placement, + scheduling, + } => { + let req = TenantPolicyRequest { + scheduling: scheduling.map(|s| s.0), + placement: placement.map(|p| p.0), + }; + storcon_client + .dispatch::<_, ()>( + Method::PUT, + format!("control/v1/tenant/{tenant_id}/policy"), + Some(req), + ) + .await?; + } + Command::TenantShardSplit { + tenant_id, + shard_count, + stripe_size, + } => { + let req = TenantShardSplitRequest { + new_shard_count: shard_count, + new_stripe_size: stripe_size.map(ShardStripeSize), + }; + + let response = storcon_client + .dispatch::( + Method::PUT, + format!("control/v1/tenant/{tenant_id}/shard_split"), + Some(req), + ) + .await?; + println!( + "Split tenant {} into {} shards: {}", + tenant_id, + shard_count, + response + .new_shards + .iter() + .map(|s| format!("{:?}", s)) + .collect::>() + .join(",") + ); + } + Command::TenantShardMigrate { + tenant_shard_id, + node, + } => { + let req = TenantShardMigrateRequest { + tenant_shard_id, + node_id: node, + }; + + storcon_client + .dispatch::( + Method::PUT, + format!("control/v1/tenant/{tenant_shard_id}/migrate"), + Some(req), + ) + .await?; + } + Command::TenantConfig { tenant_id, config } => { + let tenant_conf = serde_json::from_str(&config)?; + + vps_client + .tenant_config(&TenantConfigRequest { + tenant_id, + config: tenant_conf, + }) + .await?; + } + Command::TenantScatter { tenant_id } => { + // Find the shards + let locate_response = storcon_client + .dispatch::<(), TenantLocateResponse>( + Method::GET, + format!("control/v1/tenant/{tenant_id}/locate"), + None, + ) + .await?; + let shards = locate_response.shards; + + let mut node_to_shards: HashMap> = HashMap::new(); + let shard_count = shards.len(); + for s in shards { + let entry = node_to_shards.entry(s.node_id).or_default(); + entry.push(s.shard_id); + } + + // Load list of available nodes + let nodes_resp = storcon_client + .dispatch::<(), Vec>( + Method::GET, + "control/v1/node".to_string(), + None, + ) + .await?; + + for node in nodes_resp { + if matches!(node.availability, NodeAvailabilityWrapper::Active) { + node_to_shards.entry(node.id).or_default(); + } + } + + let max_shard_per_node = shard_count / node_to_shards.len(); + + loop { + let mut migrate_shard = None; + for shards in node_to_shards.values_mut() { + if shards.len() > max_shard_per_node { + // Pick the emptiest + migrate_shard = Some(shards.pop().unwrap()); + } + } + let Some(migrate_shard) = migrate_shard else { + break; + }; + + // Pick the emptiest node to migrate to + let mut destinations = node_to_shards + .iter() + .map(|(k, v)| (k, v.len())) + .collect::>(); + destinations.sort_by_key(|i| i.1); + let (destination_node, destination_count) = *destinations.first().unwrap(); + if destination_count + 1 > max_shard_per_node { + // Even the emptiest destination doesn't have space: we're done + break; + } + let destination_node = *destination_node; + + node_to_shards + .get_mut(&destination_node) + .unwrap() + .push(migrate_shard); + + println!("Migrate {} -> {} ...", migrate_shard, destination_node); + + storcon_client + .dispatch::( + Method::PUT, + format!("control/v1/tenant/{migrate_shard}/migrate"), + Some(TenantShardMigrateRequest { + tenant_shard_id: migrate_shard, + node_id: destination_node, + }), + ) + .await?; + println!("Migrate {} -> {} OK", migrate_shard, destination_node); + } + + // Spread the shards across the nodes + } + Command::TenantDescribe { tenant_id } => { + let describe_response = storcon_client + .dispatch::<(), TenantDescribeResponse>( + Method::GET, + format!("control/v1/tenant/{tenant_id}"), + None, + ) + .await?; + let shards = describe_response.shards; + let mut table = comfy_table::Table::new(); + table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]); + for shard in shards { + let secondary = shard + .node_secondary + .iter() + .map(|n| format!("{}", n)) + .collect::>() + .join(","); + + let mut status_parts = Vec::new(); + if shard.is_reconciling { + status_parts.push("reconciling"); + } + + if shard.is_pending_compute_notification { + status_parts.push("pending_compute"); + } + + if shard.is_splitting { + status_parts.push("splitting"); + } + let status = status_parts.join(","); + + table.add_row([ + format!("{}", shard.tenant_shard_id), + shard + .node_attached + .map(|n| format!("{}", n)) + .unwrap_or(String::new()), + secondary, + shard.last_error, + status, + ]); + } + println!("{table}"); + } + } + + Ok(()) +} diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index dcf9e38106..be24d452b6 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -4,7 +4,7 @@ use std::str::FromStr; /// API (`/control/v1` prefix). Implemented by the server /// in [`attachment_service::http`] use serde::{Deserialize, Serialize}; -use utils::id::NodeId; +use utils::id::{NodeId, TenantId}; use crate::{ models::{ShardParameters, TenantConfig}, @@ -68,12 +68,27 @@ pub struct TenantLocateResponse { #[derive(Serialize, Deserialize)] pub struct TenantDescribeResponse { + pub tenant_id: TenantId, pub shards: Vec, pub stripe_size: ShardStripeSize, pub policy: PlacementPolicy, pub config: TenantConfig, } +#[derive(Serialize, Deserialize)] +pub struct NodeDescribeResponse { + pub id: NodeId, + + pub availability: NodeAvailabilityWrapper, + pub scheduling: NodeSchedulingPolicy, + + pub listen_http_addr: String, + pub listen_http_port: u16, + + pub listen_pg_addr: String, + pub listen_pg_port: u16, +} + #[derive(Serialize, Deserialize)] pub struct TenantDescribeResponseShard { pub tenant_shard_id: TenantShardId, @@ -89,6 +104,8 @@ pub struct TenantDescribeResponseShard { pub is_pending_compute_notification: bool, /// A shard split is currently underway pub is_splitting: bool, + + pub scheduling_policy: ShardSchedulingPolicy, } /// Explicitly migrating a particular shard is a low level operation @@ -103,7 +120,7 @@ pub struct TenantShardMigrateRequest { /// Utilisation score indicating how good a candidate a pageserver /// is for scheduling the next tenant. See [`crate::models::PageserverUtilization`]. /// Lower values are better. -#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] +#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Debug)] pub struct UtilizationScore(pub u64); impl UtilizationScore { @@ -112,7 +129,7 @@ impl UtilizationScore { } } -#[derive(Serialize, Clone, Copy)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] #[serde(into = "NodeAvailabilityWrapper")] pub enum NodeAvailability { // Normal, happy state @@ -135,7 +152,7 @@ impl Eq for NodeAvailability {} // This wrapper provides serde functionality and it should only be used to // communicate with external callers which don't know or care about the // utilisation score of the pageserver it is targeting. -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub enum NodeAvailabilityWrapper { Active, Offline, @@ -161,21 +178,6 @@ impl From for NodeAvailabilityWrapper { } } -impl FromStr for NodeAvailability { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - match s { - // This is used when parsing node configuration requests from neon-local. - // Assume the worst possible utilisation score - // and let it get updated via the heartbeats. - "active" => Ok(Self::Active(UtilizationScore::worst())), - "offline" => Ok(Self::Offline), - _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")), - } - } -} - #[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)] pub enum ShardSchedulingPolicy { // Normal mode: the tenant's scheduled locations may be updated at will, including @@ -202,7 +204,7 @@ impl Default for ShardSchedulingPolicy { } } -#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)] pub enum NodeSchedulingPolicy { Active, Filling, diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index 5a86e03d2b..7df0b58596 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -1,3 +1,4 @@ +import json import time from collections import defaultdict from datetime import datetime, timezone @@ -24,7 +25,7 @@ from fixtures.pageserver.utils import ( from fixtures.pg_version import PgVersion from fixtures.remote_storage import RemoteStorageKind, s3_storage from fixtures.types import TenantId, TenantShardId, TimelineId -from fixtures.utils import run_pg_bench_small, wait_until +from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until from mypy_boto3_s3.type_defs import ( ObjectTypeDef, ) @@ -1131,3 +1132,89 @@ def test_storage_controller_shard_scheduling_policy(neon_env_builder: NeonEnvBui # And indeed the tenant should be attached assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1 + + +def test_storcon_cli(neon_env_builder: NeonEnvBuilder): + """ + The storage controller command line interface (storcon-cli) is an internal tool. Most tests + just use the APIs directly: this test exercises some basics of the CLI as a regression test + that the client remains usable as the server evolves. + """ + output_dir = neon_env_builder.test_output_dir + shard_count = 4 + env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count) + base_args = [env.neon_binpath / "storcon_cli", "--api", env.storage_controller_api] + + def storcon_cli(args): + """ + CLI wrapper: returns stdout split into a list of non-empty strings + """ + (output_path, stdout, status_code) = subprocess_capture( + output_dir, + [str(s) for s in base_args + args], + echo_stderr=True, + echo_stdout=True, + env={}, + check=False, + capture_stdout=True, + timeout=10, + ) + if status_code: + log.warning(f"Command {args} failed") + log.warning(f"Output at: {output_path}") + + raise RuntimeError("CLI failure (check logs for stderr)") + + assert stdout is not None + return [line.strip() for line in stdout.split("\n") if line.strip()] + + # List nodes + node_lines = storcon_cli(["nodes"]) + # Table header, footer, and one line of data + assert len(node_lines) == 5 + assert "localhost" in node_lines[3] + + # Pause scheduling onto a node + storcon_cli(["node-configure", "--node-id", "1", "--scheduling", "pause"]) + assert "Pause" in storcon_cli(["nodes"])[3] + + # Make a node offline + storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"]) + assert "Offline" in storcon_cli(["nodes"])[3] + + # List tenants + tenant_lines = storcon_cli(["tenants"]) + assert len(tenant_lines) == 5 + assert str(env.initial_tenant) in tenant_lines[3] + + env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy.*") + + # Describe a tenant + tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)]) + assert len(tenant_lines) == 3 + shard_count * 2 + assert str(env.initial_tenant) in tenant_lines[3] + + # Pause changes on a tenant + storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"]) + assert "Stop" in storcon_cli(["tenants"])[3] + + # Change a tenant's placement + storcon_cli( + ["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"] + ) + assert "Secondary" in storcon_cli(["tenants"])[3] + + # Modify a tenant's config + storcon_cli( + [ + "tenant-config", + "--tenant-id", + str(env.initial_tenant), + "--config", + json.dumps({"pitr_interval": "1m"}), + ] + ) + + # Quiesce any background reconciliation before doing consistency check + env.storage_controller.reconcile_until_idle(timeout_secs=10) + env.storage_controller.consistency_check()