storage controller: API + CLI for migrating secondary locations (#10284)

## Problem

Currently, if we want to move a secondary there isn't a neat way to do
that: we just have migration API for the attached location, and it is
only clean to use that if you've manually created a secondary via
pageserver API in the place you're going to move it to.

Secondary migration API enables:
- Moving the secondary somewhere because we would like to later move the
attached location there.
- Move the secondary location because we just want to reclaim some disk
space from its current location.

## Summary of changes

- Add `/migrate_secondary` API
- Add `tenant-shard-migrate-secondary` CLI
- Add tests for above
This commit is contained in:
John Spray
2025-01-13 14:52:43 +00:00
committed by GitHub
parent ceacc29609
commit ef8bfacd6b
7 changed files with 210 additions and 32 deletions

View File

@@ -822,10 +822,7 @@ impl StorageController {
self.dispatch(
Method::PUT,
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
Some(TenantShardMigrateRequest {
tenant_shard_id,
node_id,
}),
Some(TenantShardMigrateRequest { node_id }),
)
.await
}

View File

@@ -112,6 +112,13 @@ enum Command {
#[arg(long)]
node: NodeId,
},
/// Migrate the secondary location for a tenant shard to a specific pageserver.
TenantShardMigrateSecondary {
#[arg(long)]
tenant_shard_id: TenantShardId,
#[arg(long)]
node: NodeId,
},
/// Cancel any ongoing reconciliation for this shard
TenantShardCancelReconcile {
#[arg(long)]
@@ -540,10 +547,7 @@ async fn main() -> anyhow::Result<()> {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest {
tenant_shard_id,
node_id: node,
};
let req = TenantShardMigrateRequest { node_id: node };
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
@@ -553,6 +557,20 @@ async fn main() -> anyhow::Result<()> {
)
.await?;
}
Command::TenantShardMigrateSecondary {
tenant_shard_id,
node,
} => {
let req = TenantShardMigrateRequest { node_id: node };
storcon_client
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{tenant_shard_id}/migrate_secondary"),
Some(req),
)
.await?;
}
Command::TenantShardCancelReconcile { tenant_shard_id } => {
storcon_client
.dispatch::<(), ()>(
@@ -915,10 +933,7 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
Method::PUT,
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
Some(TenantShardMigrateRequest {
tenant_shard_id: mv.tenant_shard_id,
node_id: mv.to,
}),
Some(TenantShardMigrateRequest { node_id: mv.to }),
)
.await
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))

View File

@@ -179,7 +179,6 @@ pub struct TenantDescribeResponseShard {
/// specifies some constraints, e.g. asking it to get off particular node(s)
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantShardMigrateRequest {
pub tenant_shard_id: TenantShardId,
pub node_id: NodeId,
}

View File

@@ -124,7 +124,10 @@ impl ComputeHookTenant {
if let Some(shard_idx) = shard_idx {
sharded.shards.remove(shard_idx);
} else {
tracing::warn!("Shard not found while handling detach")
// This is a valid but niche case, where the tenant was previously attached
// as a Secondary location and then detached, so has no previously notified
// state.
tracing::info!("Shard not found while handling detach")
}
}
ComputeHookTenant::Unsharded(_) => {
@@ -761,7 +764,10 @@ impl ComputeHook {
let mut state_locked = self.state.lock().unwrap();
match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(_) => {
tracing::warn!("Compute hook tenant not found for detach");
// This is a valid but niche case, where the tenant was previously attached
// as a Secondary location and then detached, so has no previously notified
// state.
tracing::info!("Compute hook tenant not found for detach");
}
Entry::Occupied(mut e) => {
let sharded = e.get().is_sharded();

View File

@@ -690,7 +690,8 @@ async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError
};
let state = get_state(&req);
let nodes = state.service.node_list().await?;
let mut nodes = state.service.node_list().await?;
nodes.sort_by_key(|n| n.get_id());
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
json_response(StatusCode::OK, api_nodes)
@@ -1005,6 +1006,29 @@ async fn handle_tenant_shard_migrate(
)
}
async fn handle_tenant_shard_migrate_secondary(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
json_response(
StatusCode::OK,
service
.tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
.await?,
)
}
async fn handle_tenant_shard_cancel_reconcile(
service: Arc<Service>,
req: Request<Body>,
@@ -1855,6 +1879,16 @@ pub fn make_router(
RequestName("control_v1_tenant_migrate"),
)
})
.put(
"/control/v1/tenant/:tenant_shard_id/migrate_secondary",
|r| {
tenant_service_handler(
r,
handle_tenant_shard_migrate_secondary,
RequestName("control_v1_tenant_migrate_secondary"),
)
},
)
.put(
"/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
|r| {

View File

@@ -5055,6 +5055,69 @@ impl Service {
Ok(TenantShardMigrateResponse {})
}
pub(crate) async fn tenant_shard_migrate_secondary(
&self,
tenant_shard_id: TenantShardId,
migrate_req: TenantShardMigrateRequest,
) -> Result<TenantShardMigrateResponse, ApiError> {
let waiter = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let Some(node) = nodes.get(&migrate_req.node_id) else {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Node {} not found",
migrate_req.node_id
)));
};
if !node.is_available() {
// Warn but proceed: the caller may intend to manually adjust the placement of
// a shard even if the node is down, e.g. if intervening during an incident.
tracing::warn!("Migrating to unavailable node {node}");
}
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant shard not found").into(),
));
};
if shard.intent.get_secondary().len() == 1
&& shard.intent.get_secondary()[0] == migrate_req.node_id
{
tracing::info!(
"Migrating secondary to {node}: intent is unchanged {:?}",
shard.intent
);
} else if shard.intent.get_attached() == &Some(migrate_req.node_id) {
tracing::info!("Migrating secondary to {node}: already attached where we were asked to create a secondary");
} else {
let old_secondaries = shard.intent.get_secondary().clone();
for secondary in old_secondaries {
shard.intent.remove_secondary(scheduler, secondary);
}
shard.intent.push_secondary(scheduler, migrate_req.node_id);
shard.sequence = shard.sequence.next();
tracing::info!(
"Migrating secondary to {node}: new intent {:?}",
shard.intent
);
}
self.maybe_reconcile_shard(shard, nodes)
};
if let Some(waiter) = waiter {
waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
} else {
tracing::info!("Migration is a no-op");
}
Ok(TenantShardMigrateResponse {})
}
/// 'cancel' in this context means cancel any ongoing reconcile
pub(crate) async fn tenant_shard_cancel_reconcile(
&self,

View File

@@ -1052,7 +1052,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
that just hits the endpoints to check that they don't bitrot.
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.num_pageservers = 3
env = neon_env_builder.init_start()
tenant_id = TenantId.generate()
@@ -1077,7 +1077,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
"GET", f"{env.storage_controller_api}/debug/v1/scheduler"
)
# Two nodes, in a dict of node_id->node
assert len(response.json()["nodes"]) == 2
assert len(response.json()["nodes"]) == 3
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
assert all(v["may_schedule"] for v in response.json()["nodes"].values())
@@ -1088,13 +1088,25 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
# Secondary migration API: superficial check that it migrates
secondary_dest = env.pageservers[2].id
env.storage_controller.request(
"PUT",
f"{env.storage_controller_api}/control/v1/tenant/{tenant_id}-0002/migrate_secondary",
headers=env.storage_controller.headers(TokenScope.ADMIN),
json={"tenant_shard_id": f"{tenant_id}-0002", "node_id": secondary_dest},
)
assert env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_secondary"] == [
secondary_dest
]
# Node unclean drop API
response = env.storage_controller.request(
"POST",
f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop",
headers=env.storage_controller.headers(TokenScope.ADMIN),
)
assert len(env.storage_controller.node_list()) == 1
assert len(env.storage_controller.node_list()) == 2
# Tenant unclean drop API
response = env.storage_controller.request(
@@ -1812,7 +1824,13 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
"""
output_dir = neon_env_builder.test_output_dir
shard_count = 4
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.create_tenant(tenant_id, placement_policy='{"Attached":1}', shard_count=shard_count)
base_args = [env.neon_binpath / "storcon_cli", "--api", env.storage_controller_api]
def storcon_cli(args):
@@ -1841,7 +1859,7 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
# List nodes
node_lines = storcon_cli(["nodes"])
# Table header, footer, and one line of data
assert len(node_lines) == 5
assert len(node_lines) == 7
assert "localhost" in node_lines[3]
# Pause scheduling onto a node
@@ -1859,10 +1877,21 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"])
assert "Offline" in storcon_cli(["nodes"])[3]
# Restore node, verify status changes in CLI output
env.pageservers[0].start()
def is_online():
assert "Offline" not in storcon_cli(["nodes"])
wait_until(is_online)
# Let everything stabilize after node failure to avoid interfering with subsequent steps
env.storage_controller.reconcile_until_idle(timeout_secs=10)
# List tenants
tenant_lines = storcon_cli(["tenants"])
assert len(tenant_lines) == 5
assert str(env.initial_tenant) in tenant_lines[3]
assert str(tenant_id) in tenant_lines[3]
# Setting scheduling policies intentionally result in warnings, they're for rare use.
env.storage_controller.allowed_errors.extend(
@@ -1870,23 +1899,58 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
)
# Describe a tenant
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)])
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(tenant_id)])
assert len(tenant_lines) >= 3 + shard_count * 2
assert str(env.initial_tenant) in tenant_lines[0]
assert str(tenant_id) in tenant_lines[0]
# Migrate an attached location
def other_ps_id(current_ps_id):
return (
env.pageservers[0].id
if current_ps_id == env.pageservers[1].id
else env.pageservers[1].id
)
storcon_cli(
[
"tenant-shard-migrate",
"--tenant-shard-id",
f"{tenant_id}-0004",
"--node",
str(
other_ps_id(
env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_attached"]
)
),
]
)
# Migrate a secondary location
storcon_cli(
[
"tenant-shard-migrate-secondary",
"--tenant-shard-id",
f"{tenant_id}-0004",
"--node",
str(
other_ps_id(
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
"node_secondary"
][0]
)
),
]
)
# Pause changes on a tenant
storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"])
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--scheduling", "stop"])
assert "Stop" in storcon_cli(["tenants"])[3]
# Cancel ongoing reconcile on a tenant
storcon_cli(
["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{env.initial_tenant}-0104"]
)
storcon_cli(["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{tenant_id}-0104"])
# Change a tenant's placement
storcon_cli(
["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"]
)
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--placement", "secondary"])
assert "Secondary" in storcon_cli(["tenants"])[3]
# Modify a tenant's config
@@ -1894,7 +1958,7 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
[
"patch-tenant-config",
"--tenant-id",
str(env.initial_tenant),
str(tenant_id),
"--config",
json.dumps({"pitr_interval": "1m"}),
]