mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
## Problem Storage controller didn't previously consider AZ locality between compute and pageservers when scheduling nodes. Control plane has this feature, and, since we are migrating tenants away from it, we need feature parity to avoid perf degradations. ## Summary of changes The change itself is fairly simple: 1. Thread az info into the scheduler 2. Add an extra member to the scheduling scores Step (2) deserves some more discussion. Let's break it down by the shard type being scheduled: **Attached Shards** We wish for attached shards of a tenant to end up in the preferred AZ of the tenant since that is where the compute is like to be. The AZ member for `NodeAttachmentSchedulingScore` has been placed below the affinity score (so it's got the second biggest weight for picking the node). The rationale for going below the affinity score is to avoid having all shards of a single tenant placed on the same node in 2 node regions, since that would mean that one tenant can drive the general workload of an entire pageserver. I'm not 100% sure this is the right decision, so open to discussing hoisting the AZ up to first place. **Secondary Shards** We wish for secondary shards of a tenant to be scheduled in a different AZ from the preferred one for HA purposes. The AZ member for `NodeSecondarySchedulingScore` has been placed first, so nodes in different AZs from the preferred one will always be considered first. On small clusters, this can mean that all the secondaries of a tenant are scheduled to the same pageserver, but secondaries don't use up as many resources as the attached location, so IMO the argument made for attached shards doesn't hold. Related: https://github.com/neondatabase/neon/issues/8848
981 lines
34 KiB
Rust
981 lines
34 KiB
Rust
use futures::StreamExt;
|
|
use std::{str::FromStr, time::Duration};
|
|
|
|
use clap::{Parser, Subcommand};
|
|
use pageserver_api::{
|
|
controller_api::{
|
|
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
|
|
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
|
},
|
|
models::{
|
|
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
|
|
ShardParameters, TenantConfig, TenantConfigRequest, TenantShardSplitRequest,
|
|
TenantShardSplitResponse,
|
|
},
|
|
shard::{ShardStripeSize, TenantShardId},
|
|
};
|
|
use pageserver_client::mgmt_api::{self};
|
|
use reqwest::{Method, StatusCode, Url};
|
|
use utils::id::{NodeId, TenantId};
|
|
|
|
use pageserver_api::controller_api::{
|
|
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
|
TenantShardMigrateRequest, TenantShardMigrateResponse,
|
|
};
|
|
use storage_controller_client::control_api::Client;
|
|
|
|
#[derive(Subcommand, Debug)]
|
|
enum Command {
|
|
/// Register a pageserver with the storage controller. This shouldn't usually be necessary,
|
|
/// since pageservers auto-register when they start up
|
|
NodeRegister {
|
|
#[arg(long)]
|
|
node_id: NodeId,
|
|
|
|
#[arg(long)]
|
|
listen_pg_addr: String,
|
|
#[arg(long)]
|
|
listen_pg_port: u16,
|
|
|
|
#[arg(long)]
|
|
listen_http_addr: String,
|
|
#[arg(long)]
|
|
listen_http_port: u16,
|
|
#[arg(long)]
|
|
availability_zone_id: String,
|
|
},
|
|
|
|
/// Modify a node's configuration in the storage controller
|
|
NodeConfigure {
|
|
#[arg(long)]
|
|
node_id: NodeId,
|
|
|
|
/// Availability is usually auto-detected based on heartbeats. Set 'offline' here to
|
|
/// manually mark a node offline
|
|
#[arg(long)]
|
|
availability: Option<NodeAvailabilityArg>,
|
|
/// Scheduling policy controls whether tenant shards may be scheduled onto this node.
|
|
#[arg(long)]
|
|
scheduling: Option<NodeSchedulingPolicy>,
|
|
},
|
|
NodeDelete {
|
|
#[arg(long)]
|
|
node_id: NodeId,
|
|
},
|
|
/// Modify a tenant's policies in the storage controller
|
|
TenantPolicy {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
/// Placement policy controls whether a tenant is `detached`, has only a secondary location (`secondary`),
|
|
/// or is in the normal attached state with N secondary locations (`attached:N`)
|
|
#[arg(long)]
|
|
placement: Option<PlacementPolicyArg>,
|
|
/// Scheduling policy enables pausing the controller's scheduling activity involving this tenant. `active` is normal,
|
|
/// `essential` disables optimization scheduling changes, `pause` disables all scheduling changes, and `stop` prevents
|
|
/// all reconciliation activity including for scheduling changes already made. `pause` and `stop` can make a tenant
|
|
/// unavailable, and are only for use in emergencies.
|
|
#[arg(long)]
|
|
scheduling: Option<ShardSchedulingPolicyArg>,
|
|
},
|
|
/// List nodes known to the storage controller
|
|
Nodes {},
|
|
/// List tenants known to the storage controller
|
|
Tenants {
|
|
/// If this field is set, it will list the tenants on a specific node
|
|
node_id: Option<NodeId>,
|
|
},
|
|
/// Create a new tenant in the storage controller, and by extension on pageservers.
|
|
TenantCreate {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
},
|
|
/// Delete a tenant in the storage controller, and by extension on pageservers.
|
|
TenantDelete {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
},
|
|
/// Split an existing tenant into a higher number of shards than its current shard count.
|
|
TenantShardSplit {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
#[arg(long)]
|
|
shard_count: u8,
|
|
/// Optional, in 8kiB pages. e.g. set 2048 for 16MB stripes.
|
|
#[arg(long)]
|
|
stripe_size: Option<u32>,
|
|
},
|
|
/// Migrate the attached location for a tenant shard to a specific pageserver.
|
|
TenantShardMigrate {
|
|
#[arg(long)]
|
|
tenant_shard_id: TenantShardId,
|
|
#[arg(long)]
|
|
node: NodeId,
|
|
},
|
|
/// Modify the pageserver tenant configuration of a tenant: this is the configuration structure
|
|
/// that is passed through to pageservers, and does not affect storage controller behavior.
|
|
TenantConfig {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
#[arg(long)]
|
|
config: String,
|
|
},
|
|
/// Print details about a particular tenant, including all its shards' states.
|
|
TenantDescribe {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
},
|
|
/// For a tenant which hasn't been onboarded to the storage controller yet, add it in secondary
|
|
/// mode so that it can warm up content on a pageserver.
|
|
TenantWarmup {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
},
|
|
/// Uncleanly drop a tenant from the storage controller: this doesn't delete anything from pageservers. Appropriate
|
|
/// if you e.g. used `tenant-warmup` by mistake on a tenant ID that doesn't really exist, or is in some other region.
|
|
TenantDrop {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
#[arg(long)]
|
|
unclean: bool,
|
|
},
|
|
NodeDrop {
|
|
#[arg(long)]
|
|
node_id: NodeId,
|
|
#[arg(long)]
|
|
unclean: bool,
|
|
},
|
|
TenantSetTimeBasedEviction {
|
|
#[arg(long)]
|
|
tenant_id: TenantId,
|
|
#[arg(long)]
|
|
period: humantime::Duration,
|
|
#[arg(long)]
|
|
threshold: humantime::Duration,
|
|
},
|
|
// Migrate away from a set of specified pageservers by moving the primary attachments to pageservers
|
|
// outside of the specified set.
|
|
BulkMigrate {
|
|
// Set of pageserver node ids to drain.
|
|
#[arg(long)]
|
|
nodes: Vec<NodeId>,
|
|
// Optional: migration concurrency (default is 8)
|
|
#[arg(long)]
|
|
concurrency: Option<usize>,
|
|
// Optional: maximum number of shards to migrate
|
|
#[arg(long)]
|
|
max_shards: Option<usize>,
|
|
// Optional: when set to true, nothing is migrated, but the plan is printed to stdout
|
|
#[arg(long)]
|
|
dry_run: Option<bool>,
|
|
},
|
|
/// Start draining the specified pageserver.
|
|
/// The drain is complete when the schedulling policy returns to active.
|
|
StartDrain {
|
|
#[arg(long)]
|
|
node_id: NodeId,
|
|
},
|
|
/// Cancel draining the specified pageserver and wait for `timeout`
|
|
/// for the operation to be canceled. May be retried.
|
|
CancelDrain {
|
|
#[arg(long)]
|
|
node_id: NodeId,
|
|
#[arg(long)]
|
|
timeout: humantime::Duration,
|
|
},
|
|
/// Start filling the specified pageserver.
|
|
/// The drain is complete when the schedulling policy returns to active.
|
|
StartFill {
|
|
#[arg(long)]
|
|
node_id: NodeId,
|
|
},
|
|
/// Cancel filling the specified pageserver and wait for `timeout`
|
|
/// for the operation to be canceled. May be retried.
|
|
CancelFill {
|
|
#[arg(long)]
|
|
node_id: NodeId,
|
|
#[arg(long)]
|
|
timeout: humantime::Duration,
|
|
},
|
|
}
|
|
|
|
#[derive(Parser)]
|
|
#[command(
|
|
author,
|
|
version,
|
|
about,
|
|
long_about = "CLI for Storage Controller Support/Debug"
|
|
)]
|
|
#[command(arg_required_else_help(true))]
|
|
struct Cli {
|
|
#[arg(long)]
|
|
/// URL to storage controller. e.g. http://127.0.0.1:1234 when using `neon_local`
|
|
api: Url,
|
|
|
|
#[arg(long)]
|
|
/// JWT token for authenticating with storage controller. Depending on the API used, this
|
|
/// should have either `pageserverapi` or `admin` scopes: for convenience, you should mint
|
|
/// a token with both scopes to use with this tool.
|
|
jwt: Option<String>,
|
|
|
|
#[command(subcommand)]
|
|
command: Command,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct PlacementPolicyArg(PlacementPolicy);
|
|
|
|
impl FromStr for PlacementPolicyArg {
|
|
type Err = anyhow::Error;
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
match s {
|
|
"detached" => Ok(Self(PlacementPolicy::Detached)),
|
|
"secondary" => Ok(Self(PlacementPolicy::Secondary)),
|
|
_ if s.starts_with("attached:") => {
|
|
let mut splitter = s.split(':');
|
|
let _prefix = splitter.next().unwrap();
|
|
match splitter.next().and_then(|s| s.parse::<usize>().ok()) {
|
|
Some(n) => Ok(Self(PlacementPolicy::Attached(n))),
|
|
None => Err(anyhow::anyhow!(
|
|
"Invalid format '{s}', a valid example is 'attached:1'"
|
|
)),
|
|
}
|
|
}
|
|
_ => Err(anyhow::anyhow!(
|
|
"Unknown placement policy '{s}', try detached,secondary,attached:<n>"
|
|
)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct ShardSchedulingPolicyArg(ShardSchedulingPolicy);
|
|
|
|
impl FromStr for ShardSchedulingPolicyArg {
|
|
type Err = anyhow::Error;
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
match s {
|
|
"active" => Ok(Self(ShardSchedulingPolicy::Active)),
|
|
"essential" => Ok(Self(ShardSchedulingPolicy::Essential)),
|
|
"pause" => Ok(Self(ShardSchedulingPolicy::Pause)),
|
|
"stop" => Ok(Self(ShardSchedulingPolicy::Stop)),
|
|
_ => Err(anyhow::anyhow!(
|
|
"Unknown scheduling policy '{s}', try active,essential,pause,stop"
|
|
)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct NodeAvailabilityArg(NodeAvailabilityWrapper);
|
|
|
|
impl FromStr for NodeAvailabilityArg {
|
|
type Err = anyhow::Error;
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
match s {
|
|
"active" => Ok(Self(NodeAvailabilityWrapper::Active)),
|
|
"offline" => Ok(Self(NodeAvailabilityWrapper::Offline)),
|
|
_ => Err(anyhow::anyhow!("Unknown availability state '{s}'")),
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn wait_for_scheduling_policy<F>(
|
|
client: Client,
|
|
node_id: NodeId,
|
|
timeout: Duration,
|
|
f: F,
|
|
) -> anyhow::Result<NodeSchedulingPolicy>
|
|
where
|
|
F: Fn(NodeSchedulingPolicy) -> bool,
|
|
{
|
|
let waiter = tokio::time::timeout(timeout, async move {
|
|
loop {
|
|
let node = client
|
|
.dispatch::<(), NodeDescribeResponse>(
|
|
Method::GET,
|
|
format!("control/v1/node/{node_id}"),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
if f(node.scheduling) {
|
|
return Ok::<NodeSchedulingPolicy, mgmt_api::Error>(node.scheduling);
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(waiter.await??)
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
let cli = Cli::parse();
|
|
|
|
let storcon_client = Client::new(cli.api.clone(), cli.jwt.clone());
|
|
|
|
let mut trimmed = cli.api.to_string();
|
|
trimmed.pop();
|
|
let vps_client = mgmt_api::Client::new(trimmed, cli.jwt.as_deref());
|
|
|
|
match cli.command {
|
|
Command::NodeRegister {
|
|
node_id,
|
|
listen_pg_addr,
|
|
listen_pg_port,
|
|
listen_http_addr,
|
|
listen_http_port,
|
|
availability_zone_id,
|
|
} => {
|
|
storcon_client
|
|
.dispatch::<_, ()>(
|
|
Method::POST,
|
|
"control/v1/node".to_string(),
|
|
Some(NodeRegisterRequest {
|
|
node_id,
|
|
listen_pg_addr,
|
|
listen_pg_port,
|
|
listen_http_addr,
|
|
listen_http_port,
|
|
availability_zone_id: AvailabilityZone(availability_zone_id),
|
|
}),
|
|
)
|
|
.await?;
|
|
}
|
|
Command::TenantCreate { tenant_id } => {
|
|
storcon_client
|
|
.dispatch::<_, ()>(
|
|
Method::POST,
|
|
"v1/tenant".to_string(),
|
|
Some(TenantCreateRequest {
|
|
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
|
generation: None,
|
|
shard_parameters: ShardParameters::default(),
|
|
placement_policy: Some(PlacementPolicy::Attached(1)),
|
|
config: TenantConfig::default(),
|
|
}),
|
|
)
|
|
.await?;
|
|
}
|
|
Command::TenantDelete { tenant_id } => {
|
|
let status = vps_client
|
|
.tenant_delete(TenantShardId::unsharded(tenant_id))
|
|
.await?;
|
|
tracing::info!("Delete status: {}", status);
|
|
}
|
|
Command::Nodes {} => {
|
|
let mut resp = storcon_client
|
|
.dispatch::<(), Vec<NodeDescribeResponse>>(
|
|
Method::GET,
|
|
"control/v1/node".to_string(),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
|
|
|
|
let mut table = comfy_table::Table::new();
|
|
table.set_header(["Id", "Hostname", "Scheduling", "Availability"]);
|
|
for node in resp {
|
|
table.add_row([
|
|
format!("{}", node.id),
|
|
node.listen_http_addr,
|
|
format!("{:?}", node.scheduling),
|
|
format!("{:?}", node.availability),
|
|
]);
|
|
}
|
|
println!("{table}");
|
|
}
|
|
Command::NodeConfigure {
|
|
node_id,
|
|
availability,
|
|
scheduling,
|
|
} => {
|
|
let req = NodeConfigureRequest {
|
|
node_id,
|
|
availability: availability.map(|a| a.0),
|
|
scheduling,
|
|
};
|
|
storcon_client
|
|
.dispatch::<_, ()>(
|
|
Method::PUT,
|
|
format!("control/v1/node/{node_id}/config"),
|
|
Some(req),
|
|
)
|
|
.await?;
|
|
}
|
|
Command::Tenants {
|
|
node_id: Some(node_id),
|
|
} => {
|
|
let describe_response = storcon_client
|
|
.dispatch::<(), NodeShardResponse>(
|
|
Method::GET,
|
|
format!("control/v1/node/{node_id}/shards"),
|
|
None,
|
|
)
|
|
.await?;
|
|
let shards = describe_response.shards;
|
|
let mut table = comfy_table::Table::new();
|
|
table.set_header([
|
|
"Shard",
|
|
"Intended Primary/Secondary",
|
|
"Observed Primary/Secondary",
|
|
]);
|
|
for shard in shards {
|
|
table.add_row([
|
|
format!("{}", shard.tenant_shard_id),
|
|
match shard.is_intended_secondary {
|
|
None => "".to_string(),
|
|
Some(true) => "Secondary".to_string(),
|
|
Some(false) => "Primary".to_string(),
|
|
},
|
|
match shard.is_observed_secondary {
|
|
None => "".to_string(),
|
|
Some(true) => "Secondary".to_string(),
|
|
Some(false) => "Primary".to_string(),
|
|
},
|
|
]);
|
|
}
|
|
println!("{table}");
|
|
}
|
|
Command::Tenants { node_id: None } => {
|
|
let mut resp = storcon_client
|
|
.dispatch::<(), Vec<TenantDescribeResponse>>(
|
|
Method::GET,
|
|
"control/v1/tenant".to_string(),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
resp.sort_by(|a, b| a.tenant_id.cmp(&b.tenant_id));
|
|
|
|
let mut table = comfy_table::Table::new();
|
|
table.set_header([
|
|
"TenantId",
|
|
"ShardCount",
|
|
"StripeSize",
|
|
"Placement",
|
|
"Scheduling",
|
|
]);
|
|
for tenant in resp {
|
|
let shard_zero = tenant.shards.into_iter().next().unwrap();
|
|
table.add_row([
|
|
format!("{}", tenant.tenant_id),
|
|
format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
|
|
format!("{:?}", tenant.stripe_size),
|
|
format!("{:?}", tenant.policy),
|
|
format!("{:?}", shard_zero.scheduling_policy),
|
|
]);
|
|
}
|
|
|
|
println!("{table}");
|
|
}
|
|
Command::TenantPolicy {
|
|
tenant_id,
|
|
placement,
|
|
scheduling,
|
|
} => {
|
|
let req = TenantPolicyRequest {
|
|
scheduling: scheduling.map(|s| s.0),
|
|
placement: placement.map(|p| p.0),
|
|
};
|
|
storcon_client
|
|
.dispatch::<_, ()>(
|
|
Method::PUT,
|
|
format!("control/v1/tenant/{tenant_id}/policy"),
|
|
Some(req),
|
|
)
|
|
.await?;
|
|
}
|
|
Command::TenantShardSplit {
|
|
tenant_id,
|
|
shard_count,
|
|
stripe_size,
|
|
} => {
|
|
let req = TenantShardSplitRequest {
|
|
new_shard_count: shard_count,
|
|
new_stripe_size: stripe_size.map(ShardStripeSize),
|
|
};
|
|
|
|
let response = storcon_client
|
|
.dispatch::<TenantShardSplitRequest, TenantShardSplitResponse>(
|
|
Method::PUT,
|
|
format!("control/v1/tenant/{tenant_id}/shard_split"),
|
|
Some(req),
|
|
)
|
|
.await?;
|
|
println!(
|
|
"Split tenant {} into {} shards: {}",
|
|
tenant_id,
|
|
shard_count,
|
|
response
|
|
.new_shards
|
|
.iter()
|
|
.map(|s| format!("{:?}", s))
|
|
.collect::<Vec<_>>()
|
|
.join(",")
|
|
);
|
|
}
|
|
Command::TenantShardMigrate {
|
|
tenant_shard_id,
|
|
node,
|
|
} => {
|
|
let req = TenantShardMigrateRequest {
|
|
tenant_shard_id,
|
|
node_id: node,
|
|
};
|
|
|
|
storcon_client
|
|
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
|
Method::PUT,
|
|
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
|
|
Some(req),
|
|
)
|
|
.await?;
|
|
}
|
|
Command::TenantConfig { tenant_id, config } => {
|
|
let tenant_conf = serde_json::from_str(&config)?;
|
|
|
|
vps_client
|
|
.tenant_config(&TenantConfigRequest {
|
|
tenant_id,
|
|
config: tenant_conf,
|
|
})
|
|
.await?;
|
|
}
|
|
Command::TenantDescribe { tenant_id } => {
|
|
let describe_response = storcon_client
|
|
.dispatch::<(), TenantDescribeResponse>(
|
|
Method::GET,
|
|
format!("control/v1/tenant/{tenant_id}"),
|
|
None,
|
|
)
|
|
.await?;
|
|
let shards = describe_response.shards;
|
|
let mut table = comfy_table::Table::new();
|
|
table.set_header(["Shard", "Attached", "Secondary", "Last error", "status"]);
|
|
for shard in shards {
|
|
let secondary = shard
|
|
.node_secondary
|
|
.iter()
|
|
.map(|n| format!("{}", n))
|
|
.collect::<Vec<_>>()
|
|
.join(",");
|
|
|
|
let mut status_parts = Vec::new();
|
|
if shard.is_reconciling {
|
|
status_parts.push("reconciling");
|
|
}
|
|
|
|
if shard.is_pending_compute_notification {
|
|
status_parts.push("pending_compute");
|
|
}
|
|
|
|
if shard.is_splitting {
|
|
status_parts.push("splitting");
|
|
}
|
|
let status = status_parts.join(",");
|
|
|
|
table.add_row([
|
|
format!("{}", shard.tenant_shard_id),
|
|
shard
|
|
.node_attached
|
|
.map(|n| format!("{}", n))
|
|
.unwrap_or(String::new()),
|
|
secondary,
|
|
shard.last_error,
|
|
status,
|
|
]);
|
|
}
|
|
println!("{table}");
|
|
}
|
|
Command::TenantWarmup { tenant_id } => {
|
|
let describe_response = storcon_client
|
|
.dispatch::<(), TenantDescribeResponse>(
|
|
Method::GET,
|
|
format!("control/v1/tenant/{tenant_id}"),
|
|
None,
|
|
)
|
|
.await;
|
|
match describe_response {
|
|
Ok(describe) => {
|
|
if matches!(describe.policy, PlacementPolicy::Secondary) {
|
|
// Fine: it's already known to controller in secondary mode: calling
|
|
// again to put it into secondary mode won't cause problems.
|
|
} else {
|
|
anyhow::bail!("Tenant already present with policy {:?}", describe.policy);
|
|
}
|
|
}
|
|
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) => {
|
|
// Fine: this tenant isn't know to the storage controller yet.
|
|
}
|
|
Err(e) => {
|
|
// Unexpected API error
|
|
return Err(e.into());
|
|
}
|
|
}
|
|
|
|
vps_client
|
|
.location_config(
|
|
TenantShardId::unsharded(tenant_id),
|
|
pageserver_api::models::LocationConfig {
|
|
mode: pageserver_api::models::LocationConfigMode::Secondary,
|
|
generation: None,
|
|
secondary_conf: Some(LocationConfigSecondary { warm: true }),
|
|
shard_number: 0,
|
|
shard_count: 0,
|
|
shard_stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE.0,
|
|
tenant_conf: TenantConfig::default(),
|
|
},
|
|
None,
|
|
true,
|
|
)
|
|
.await?;
|
|
|
|
let describe_response = storcon_client
|
|
.dispatch::<(), TenantDescribeResponse>(
|
|
Method::GET,
|
|
format!("control/v1/tenant/{tenant_id}"),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
let secondary_ps_id = describe_response
|
|
.shards
|
|
.first()
|
|
.unwrap()
|
|
.node_secondary
|
|
.first()
|
|
.unwrap();
|
|
|
|
println!("Tenant {tenant_id} warming up on pageserver {secondary_ps_id}");
|
|
loop {
|
|
let (status, progress) = vps_client
|
|
.tenant_secondary_download(
|
|
TenantShardId::unsharded(tenant_id),
|
|
Some(Duration::from_secs(10)),
|
|
)
|
|
.await?;
|
|
println!(
|
|
"Progress: {}/{} layers, {}/{} bytes",
|
|
progress.layers_downloaded,
|
|
progress.layers_total,
|
|
progress.bytes_downloaded,
|
|
progress.bytes_total
|
|
);
|
|
match status {
|
|
StatusCode::OK => {
|
|
println!("Download complete");
|
|
break;
|
|
}
|
|
StatusCode::ACCEPTED => {
|
|
// Loop
|
|
}
|
|
_ => {
|
|
anyhow::bail!("Unexpected download status: {status}");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Command::TenantDrop { tenant_id, unclean } => {
|
|
if !unclean {
|
|
anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.")
|
|
}
|
|
storcon_client
|
|
.dispatch::<(), ()>(
|
|
Method::POST,
|
|
format!("debug/v1/tenant/{tenant_id}/drop"),
|
|
None,
|
|
)
|
|
.await?;
|
|
}
|
|
Command::NodeDrop { node_id, unclean } => {
|
|
if !unclean {
|
|
anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.")
|
|
}
|
|
storcon_client
|
|
.dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
|
|
.await?;
|
|
}
|
|
Command::NodeDelete { node_id } => {
|
|
storcon_client
|
|
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
|
|
.await?;
|
|
}
|
|
Command::TenantSetTimeBasedEviction {
|
|
tenant_id,
|
|
period,
|
|
threshold,
|
|
} => {
|
|
vps_client
|
|
.tenant_config(&TenantConfigRequest {
|
|
tenant_id,
|
|
config: TenantConfig {
|
|
eviction_policy: Some(EvictionPolicy::LayerAccessThreshold(
|
|
EvictionPolicyLayerAccessThreshold {
|
|
period: period.into(),
|
|
threshold: threshold.into(),
|
|
},
|
|
)),
|
|
heatmap_period: Some("300s".to_string()),
|
|
..Default::default()
|
|
},
|
|
})
|
|
.await?;
|
|
}
|
|
Command::BulkMigrate {
|
|
nodes,
|
|
concurrency,
|
|
max_shards,
|
|
dry_run,
|
|
} => {
|
|
// Load the list of nodes, split them up into the drained and filled sets,
|
|
// and validate that draining is possible.
|
|
let node_descs = storcon_client
|
|
.dispatch::<(), Vec<NodeDescribeResponse>>(
|
|
Method::GET,
|
|
"control/v1/node".to_string(),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
let mut node_to_drain_descs = Vec::new();
|
|
let mut node_to_fill_descs = Vec::new();
|
|
|
|
for desc in node_descs {
|
|
let to_drain = nodes.iter().any(|id| *id == desc.id);
|
|
if to_drain {
|
|
node_to_drain_descs.push(desc);
|
|
} else {
|
|
node_to_fill_descs.push(desc);
|
|
}
|
|
}
|
|
|
|
if nodes.len() != node_to_drain_descs.len() {
|
|
anyhow::bail!("Bulk migration requested away from node which doesn't exist.")
|
|
}
|
|
|
|
node_to_fill_descs.retain(|desc| {
|
|
matches!(desc.availability, NodeAvailabilityWrapper::Active)
|
|
&& matches!(
|
|
desc.scheduling,
|
|
NodeSchedulingPolicy::Active | NodeSchedulingPolicy::Filling
|
|
)
|
|
});
|
|
|
|
if node_to_fill_descs.is_empty() {
|
|
anyhow::bail!("There are no nodes to migrate to")
|
|
}
|
|
|
|
// Set the node scheduling policy to draining for the nodes which
|
|
// we plan to drain.
|
|
for node_desc in node_to_drain_descs.iter() {
|
|
let req = NodeConfigureRequest {
|
|
node_id: node_desc.id,
|
|
availability: None,
|
|
scheduling: Some(NodeSchedulingPolicy::Draining),
|
|
};
|
|
|
|
storcon_client
|
|
.dispatch::<_, ()>(
|
|
Method::PUT,
|
|
format!("control/v1/node/{}/config", node_desc.id),
|
|
Some(req),
|
|
)
|
|
.await?;
|
|
}
|
|
|
|
// Perform the migration: move each tenant shard scheduled on a node to
|
|
// be drained to a node which is being filled. A simple round robin
|
|
// strategy is used to pick the new node.
|
|
let tenants = storcon_client
|
|
.dispatch::<(), Vec<TenantDescribeResponse>>(
|
|
Method::GET,
|
|
"control/v1/tenant".to_string(),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
let mut selected_node_idx = 0;
|
|
|
|
struct MigrationMove {
|
|
tenant_shard_id: TenantShardId,
|
|
from: NodeId,
|
|
to: NodeId,
|
|
}
|
|
|
|
let mut moves: Vec<MigrationMove> = Vec::new();
|
|
|
|
let shards = tenants
|
|
.into_iter()
|
|
.flat_map(|tenant| tenant.shards.into_iter());
|
|
for shard in shards {
|
|
if let Some(max_shards) = max_shards {
|
|
if moves.len() >= max_shards {
|
|
println!(
|
|
"Stop planning shard moves since the requested maximum was reached"
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
|
|
let should_migrate = {
|
|
if let Some(attached_to) = shard.node_attached {
|
|
node_to_drain_descs
|
|
.iter()
|
|
.map(|desc| desc.id)
|
|
.any(|id| id == attached_to)
|
|
} else {
|
|
false
|
|
}
|
|
};
|
|
|
|
if !should_migrate {
|
|
continue;
|
|
}
|
|
|
|
moves.push(MigrationMove {
|
|
tenant_shard_id: shard.tenant_shard_id,
|
|
from: shard
|
|
.node_attached
|
|
.expect("We only migrate attached tenant shards"),
|
|
to: node_to_fill_descs[selected_node_idx].id,
|
|
});
|
|
selected_node_idx = (selected_node_idx + 1) % node_to_fill_descs.len();
|
|
}
|
|
|
|
let total_moves = moves.len();
|
|
|
|
if dry_run == Some(true) {
|
|
println!("Dryrun requested. Planned {total_moves} moves:");
|
|
for mv in &moves {
|
|
println!("{}: {} -> {}", mv.tenant_shard_id, mv.from, mv.to)
|
|
}
|
|
|
|
return Ok(());
|
|
}
|
|
|
|
const DEFAULT_MIGRATE_CONCURRENCY: usize = 8;
|
|
let mut stream = futures::stream::iter(moves)
|
|
.map(|mv| {
|
|
let client = Client::new(cli.api.clone(), cli.jwt.clone());
|
|
async move {
|
|
client
|
|
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
|
Method::PUT,
|
|
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
|
|
Some(TenantShardMigrateRequest {
|
|
tenant_shard_id: mv.tenant_shard_id,
|
|
node_id: mv.to,
|
|
}),
|
|
)
|
|
.await
|
|
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
|
|
}
|
|
})
|
|
.buffered(concurrency.unwrap_or(DEFAULT_MIGRATE_CONCURRENCY));
|
|
|
|
let mut success = 0;
|
|
let mut failure = 0;
|
|
|
|
while let Some(res) = stream.next().await {
|
|
match res {
|
|
Ok(_) => {
|
|
success += 1;
|
|
}
|
|
Err((tenant_shard_id, from, to, error)) => {
|
|
failure += 1;
|
|
println!(
|
|
"Failed to migrate {} from node {} to node {}: {}",
|
|
tenant_shard_id, from, to, error
|
|
);
|
|
}
|
|
}
|
|
|
|
if (success + failure) % 20 == 0 {
|
|
println!(
|
|
"Processed {}/{} shards: {} succeeded, {} failed",
|
|
success + failure,
|
|
total_moves,
|
|
success,
|
|
failure
|
|
);
|
|
}
|
|
}
|
|
|
|
println!(
|
|
"Processed {}/{} shards: {} succeeded, {} failed",
|
|
success + failure,
|
|
total_moves,
|
|
success,
|
|
failure
|
|
);
|
|
}
|
|
Command::StartDrain { node_id } => {
|
|
storcon_client
|
|
.dispatch::<(), ()>(
|
|
Method::PUT,
|
|
format!("control/v1/node/{node_id}/drain"),
|
|
None,
|
|
)
|
|
.await?;
|
|
println!("Drain started for {node_id}");
|
|
}
|
|
Command::CancelDrain { node_id, timeout } => {
|
|
storcon_client
|
|
.dispatch::<(), ()>(
|
|
Method::DELETE,
|
|
format!("control/v1/node/{node_id}/drain"),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
|
|
|
|
let final_policy =
|
|
wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
|
|
use NodeSchedulingPolicy::*;
|
|
matches!(sched, Active | PauseForRestart)
|
|
})
|
|
.await?;
|
|
|
|
println!(
|
|
"Drain was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
|
|
);
|
|
}
|
|
Command::StartFill { node_id } => {
|
|
storcon_client
|
|
.dispatch::<(), ()>(Method::PUT, format!("control/v1/node/{node_id}/fill"), None)
|
|
.await?;
|
|
|
|
println!("Fill started for {node_id}");
|
|
}
|
|
Command::CancelFill { node_id, timeout } => {
|
|
storcon_client
|
|
.dispatch::<(), ()>(
|
|
Method::DELETE,
|
|
format!("control/v1/node/{node_id}/fill"),
|
|
None,
|
|
)
|
|
.await?;
|
|
|
|
println!("Waiting for node {node_id} to quiesce on scheduling policy ...");
|
|
|
|
let final_policy =
|
|
wait_for_scheduling_policy(storcon_client, node_id, *timeout, |sched| {
|
|
use NodeSchedulingPolicy::*;
|
|
matches!(sched, Active)
|
|
})
|
|
.await?;
|
|
|
|
println!(
|
|
"Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
|
|
);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|