mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
storage controller: tech debt (#7165)
This is a mixed bag of changes split out for separate review while working on other things, and batched together to reduce load on CI runners. Each commits stands alone for review purposes: - do_tenant_shard_split was a long function and had a synchronous validation phase at the start that could readily be pulled out into a separate function. This also avoids the special casing of ApiError::BadRequest when deciding whether an abort is needed on errors - Add a 'describe' API (GET on tenant ID) that will enable storcon-cli to see what's going on with a tenant - the 'locate' API wasn't really meant for use in the field. It's for tests: demote it to the /debug/ prefix - The `Single` placement policy was a redundant duplicate of Double(0), and Double was a bad name. Rename it Attached. (https://github.com/neondatabase/neon/issues/7107) - Some neon_local commands were added for debug/demos, which are now replaced by commands in storcon-cli (#7114 ). Even though that's not merged yet, we don't need the neon_local ones any more. Closes https://github.com/neondatabase/neon/issues/7107 ## Backward compat of Single/Double -> `Attached(n)` change A database migration is used to convert any existing values.
This commit is contained in:
@@ -0,0 +1,3 @@
|
||||
|
||||
UPDATE tenant_shards set placement_policy='{"Double": 1}' where placement_policy='{"Attached": 1}';
|
||||
UPDATE tenant_shards set placement_policy='"Single"' where placement_policy='{"Attached": 0}';
|
||||
@@ -0,0 +1,3 @@
|
||||
|
||||
UPDATE tenant_shards set placement_policy='{"Attached": 1}' where placement_policy='{"Double": 1}';
|
||||
UPDATE tenant_shards set placement_policy='{"Attached": 0}' where placement_policy='"Single"';
|
||||
@@ -353,6 +353,16 @@ async fn handle_tenant_locate(
|
||||
json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
|
||||
}
|
||||
|
||||
async fn handle_tenant_describe(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
|
||||
}
|
||||
|
||||
async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
@@ -559,6 +569,9 @@ pub fn make_router(
|
||||
request_span(r, handle_node_drop)
|
||||
})
|
||||
.get("/debug/v1/tenant", |r| request_span(r, handle_tenants_dump))
|
||||
.get("/debug/v1/tenant/:tenant_id/locate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_locate)
|
||||
})
|
||||
.get("/debug/v1/scheduler", |r| {
|
||||
request_span(r, handle_scheduler_dump)
|
||||
})
|
||||
@@ -568,9 +581,6 @@ pub fn make_router(
|
||||
.put("/debug/v1/failpoints", |r| {
|
||||
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
|
||||
})
|
||||
.get("/control/v1/tenant/:tenant_id/locate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_locate)
|
||||
})
|
||||
// Node operations
|
||||
.post("/control/v1/node", |r| {
|
||||
request_span(r, handle_node_register)
|
||||
@@ -586,6 +596,9 @@ pub fn make_router(
|
||||
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
|
||||
tenant_service_handler(r, handle_tenant_shard_split)
|
||||
})
|
||||
.get("/control/v1/tenant/:tenant_id", |r| {
|
||||
tenant_service_handler(r, handle_tenant_describe)
|
||||
})
|
||||
// Tenant operations
|
||||
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
||||
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
||||
|
||||
@@ -211,15 +211,10 @@ impl Persistence {
|
||||
|
||||
let mut decoded = serde_json::from_slice::<JsonPersistence>(&bytes)
|
||||
.map_err(|e| DatabaseError::Logical(format!("Deserialization error: {e}")))?;
|
||||
for (tenant_id, tenant) in &mut decoded.tenants {
|
||||
// Backward compat: an old attachments.json from before PR #6251, replace
|
||||
// empty strings with proper defaults.
|
||||
if tenant.tenant_id.is_empty() {
|
||||
tenant.tenant_id = tenant_id.to_string();
|
||||
tenant.config = serde_json::to_string(&TenantConfig::default())
|
||||
.map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
|
||||
tenant.placement_policy = serde_json::to_string(&PlacementPolicy::Single)
|
||||
.map_err(|e| DatabaseError::Logical(format!("Serialization error: {e}")))?;
|
||||
for shard in decoded.tenants.values_mut() {
|
||||
if shard.placement_policy == "\"Single\"" {
|
||||
// Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165
|
||||
shard.placement_policy = "{\"Attached\":0}".to_string();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -475,7 +475,7 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
// Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Single, then
|
||||
// Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
|
||||
// this location will be deleted in the general case reconciliation that runs after this.
|
||||
let origin_secondary_conf = build_location_config(
|
||||
&self.shard,
|
||||
|
||||
@@ -20,8 +20,9 @@ use hyper::StatusCode;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
|
||||
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore,
|
||||
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
|
||||
TenantDescribeResponseShard, TenantLocateResponse, TenantShardMigrateRequest,
|
||||
TenantShardMigrateResponse, UtilizationScore,
|
||||
},
|
||||
models::{SecondaryProgress, TenantConfigRequest},
|
||||
};
|
||||
@@ -202,6 +203,29 @@ enum TenantCreateOrUpdate {
|
||||
Update(Vec<ShardUpdate>),
|
||||
}
|
||||
|
||||
struct ShardSplitParams {
|
||||
old_shard_count: ShardCount,
|
||||
new_shard_count: ShardCount,
|
||||
new_stripe_size: Option<ShardStripeSize>,
|
||||
targets: Vec<ShardSplitTarget>,
|
||||
policy: PlacementPolicy,
|
||||
shard_ident: ShardIdentity,
|
||||
}
|
||||
|
||||
// When preparing for a shard split, we may either choose to proceed with the split,
|
||||
// or find that the work is already done and return NoOp.
|
||||
enum ShardSplitAction {
|
||||
Split(ShardSplitParams),
|
||||
NoOp(TenantShardSplitResponse),
|
||||
}
|
||||
|
||||
// A parent shard which will be split
|
||||
struct ShardSplitTarget {
|
||||
parent_id: TenantShardId,
|
||||
node: Node,
|
||||
child_ids: Vec<TenantShardId>,
|
||||
}
|
||||
|
||||
/// When we tenant shard split operation fails, we may not be able to clean up immediately, because nodes
|
||||
/// might not be available. We therefore use a queue of abort operations processed in the background.
|
||||
struct TenantShardSplitAbort {
|
||||
@@ -1071,7 +1095,7 @@ impl Service {
|
||||
shard_stripe_size: 0,
|
||||
generation: Some(0),
|
||||
generation_pageserver: None,
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::Single).unwrap(),
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(),
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
splitting: SplitState::default(),
|
||||
};
|
||||
@@ -1098,7 +1122,7 @@ impl Service {
|
||||
TenantState::new(
|
||||
attach_req.tenant_shard_id,
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Single,
|
||||
PlacementPolicy::Attached(0),
|
||||
),
|
||||
);
|
||||
tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
|
||||
@@ -1127,7 +1151,7 @@ impl Service {
|
||||
self.persistence
|
||||
.update_tenant_shard(
|
||||
attach_req.tenant_shard_id,
|
||||
PlacementPolicy::Single,
|
||||
PlacementPolicy::Attached(0),
|
||||
conf,
|
||||
None,
|
||||
)
|
||||
@@ -1152,7 +1176,7 @@ impl Service {
|
||||
|
||||
if let Some(new_generation) = new_generation {
|
||||
tenant_state.generation = Some(new_generation);
|
||||
tenant_state.policy = PlacementPolicy::Single;
|
||||
tenant_state.policy = PlacementPolicy::Attached(0);
|
||||
} else {
|
||||
// This is a detach notification. We must update placement policy to avoid re-attaching
|
||||
// during background scheduling/reconciliation, or during storage controller restart.
|
||||
@@ -1505,11 +1529,11 @@ impl Service {
|
||||
&self,
|
||||
create_req: TenantCreateRequest,
|
||||
) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
|
||||
// As a default, single is convenient for tests that don't choose a policy.
|
||||
let placement_policy = create_req
|
||||
.placement_policy
|
||||
.clone()
|
||||
.unwrap_or(PlacementPolicy::Single);
|
||||
// As a default, zero secondaries is convenient for tests that don't choose a policy.
|
||||
.unwrap_or(PlacementPolicy::Attached(0));
|
||||
|
||||
// This service expects to handle sharding itself: it is an error to try and directly create
|
||||
// a particular shard here.
|
||||
@@ -1719,11 +1743,11 @@ impl Service {
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
if nodes.len() > 1 {
|
||||
PlacementPolicy::Double(1)
|
||||
PlacementPolicy::Attached(1)
|
||||
} else {
|
||||
// Convenience for dev/test: if we just have one pageserver, import
|
||||
// tenants into Single mode so that scheduling will succeed.
|
||||
PlacementPolicy::Single
|
||||
// tenants into non-HA mode so that scheduling will succeed.
|
||||
PlacementPolicy::Attached(0)
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -2541,9 +2565,6 @@ impl Service {
|
||||
let locked = self.inner.read().unwrap();
|
||||
tracing::info!("Locating shards for tenant {tenant_id}");
|
||||
|
||||
// Take a snapshot of pageservers
|
||||
let pageservers = locked.nodes.clone();
|
||||
|
||||
let mut result = Vec::new();
|
||||
let mut shard_params: Option<ShardParameters> = None;
|
||||
|
||||
@@ -2557,7 +2578,8 @@ impl Service {
|
||||
"Cannot locate a tenant that is not attached"
|
||||
)))?;
|
||||
|
||||
let node = pageservers
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
@@ -2605,6 +2627,47 @@ impl Service {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_describe(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<TenantDescribeResponse, ApiError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
let mut shard_zero = None;
|
||||
let mut shards = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
if tenant_shard_id.is_zero() {
|
||||
shard_zero = Some(shard);
|
||||
}
|
||||
|
||||
let response_shard = TenantDescribeResponseShard {
|
||||
tenant_shard_id: *tenant_shard_id,
|
||||
node_attached: *shard.intent.get_attached(),
|
||||
node_secondary: shard.intent.get_secondary().to_vec(),
|
||||
last_error: shard.last_error.lock().unwrap().clone(),
|
||||
is_reconciling: shard.reconciler.is_some(),
|
||||
is_pending_compute_notification: shard.pending_compute_notification,
|
||||
is_splitting: matches!(shard.splitting, SplitState::Splitting),
|
||||
};
|
||||
shards.push(response_shard);
|
||||
}
|
||||
|
||||
let Some(shard_zero) = shard_zero else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
Ok(TenantDescribeResponse {
|
||||
shards,
|
||||
stripe_size: shard_zero.shard.stripe_size,
|
||||
policy: shard_zero.policy.clone(),
|
||||
config: shard_zero.config.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%op.tenant_id))]
|
||||
async fn abort_tenant_shard_split(
|
||||
&self,
|
||||
@@ -2828,7 +2891,7 @@ impl Service {
|
||||
generation,
|
||||
&child_shard,
|
||||
&config,
|
||||
matches!(policy, PlacementPolicy::Double(n) if n > 0),
|
||||
matches!(policy, PlacementPolicy::Attached(n) if n > 0),
|
||||
)),
|
||||
},
|
||||
);
|
||||
@@ -2875,17 +2938,23 @@ impl Service {
|
||||
let new_shard_count = ShardCount::new(split_req.new_shard_count);
|
||||
let new_stripe_size = split_req.new_stripe_size;
|
||||
|
||||
let r = self.do_tenant_shard_split(tenant_id, split_req).await;
|
||||
// Validate the request and construct parameters. This phase is fallible, but does not require
|
||||
// rollback on errors, as it does no I/O and mutates no state.
|
||||
let shard_split_params = match self.prepare_tenant_shard_split(tenant_id, split_req)? {
|
||||
ShardSplitAction::NoOp(resp) => return Ok(resp),
|
||||
ShardSplitAction::Split(params) => params,
|
||||
};
|
||||
|
||||
// Execute this split: this phase mutates state and does remote I/O on pageservers. If it fails,
|
||||
// we must roll back.
|
||||
let r = self
|
||||
.do_tenant_shard_split(tenant_id, shard_split_params)
|
||||
.await;
|
||||
|
||||
match r {
|
||||
Ok(r) => Ok(r),
|
||||
Err(ApiError::BadRequest(_)) => {
|
||||
// A request validation error does not require rollback: we rejected it before we started making any changes: just
|
||||
// return the error
|
||||
r
|
||||
}
|
||||
Err(e) => {
|
||||
// General case error handling: split might be part-done, we must do work to abort it.
|
||||
// Split might be part-done, we must do work to abort it.
|
||||
tracing::warn!("Enqueuing background abort of split on {tenant_id}");
|
||||
self.abort_tx
|
||||
.send(TenantShardSplitAbort {
|
||||
@@ -2901,25 +2970,17 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn do_tenant_shard_split(
|
||||
fn prepare_tenant_shard_split(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
split_req: TenantShardSplitRequest,
|
||||
) -> Result<TenantShardSplitResponse, ApiError> {
|
||||
let mut policy = None;
|
||||
let mut shard_ident = None;
|
||||
|
||||
// A parent shard which will be split
|
||||
struct SplitTarget {
|
||||
parent_id: TenantShardId,
|
||||
node: Node,
|
||||
child_ids: Vec<TenantShardId>,
|
||||
}
|
||||
|
||||
) -> Result<ShardSplitAction, ApiError> {
|
||||
fail::fail_point!("shard-split-validation", |_| Err(ApiError::BadRequest(
|
||||
anyhow::anyhow!("failpoint")
|
||||
)));
|
||||
|
||||
let mut policy = None;
|
||||
let mut shard_ident = None;
|
||||
// Validate input, and calculate which shards we will create
|
||||
let (old_shard_count, targets) =
|
||||
{
|
||||
@@ -2995,7 +3056,7 @@ impl Service {
|
||||
|
||||
// TODO: if any reconciliation is currently in progress for this shard, wait for it.
|
||||
|
||||
targets.push(SplitTarget {
|
||||
targets.push(ShardSplitTarget {
|
||||
parent_id: *tenant_shard_id,
|
||||
node: node.clone(),
|
||||
child_ids: tenant_shard_id
|
||||
@@ -3005,9 +3066,9 @@ impl Service {
|
||||
|
||||
if targets.is_empty() {
|
||||
if children_found.len() == split_req.new_shard_count as usize {
|
||||
return Ok(TenantShardSplitResponse {
|
||||
return Ok(ShardSplitAction::NoOp(TenantShardSplitResponse {
|
||||
new_shards: children_found,
|
||||
});
|
||||
}));
|
||||
} else {
|
||||
// No shards found to split, and no existing children found: the
|
||||
// tenant doesn't exist at all.
|
||||
@@ -3038,12 +3099,36 @@ impl Service {
|
||||
};
|
||||
let policy = policy.unwrap();
|
||||
|
||||
Ok(ShardSplitAction::Split(ShardSplitParams {
|
||||
old_shard_count,
|
||||
new_shard_count: ShardCount::new(split_req.new_shard_count),
|
||||
new_stripe_size: split_req.new_stripe_size,
|
||||
targets,
|
||||
policy,
|
||||
shard_ident,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn do_tenant_shard_split(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
params: ShardSplitParams,
|
||||
) -> Result<TenantShardSplitResponse, ApiError> {
|
||||
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
|
||||
// request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
|
||||
// parent shards exist as expected, but it would be neater to do the above pre-checks within the
|
||||
// same database transaction rather than pre-check in-memory and then maybe-fail the database write.
|
||||
// (https://github.com/neondatabase/neon/issues/6676)
|
||||
|
||||
let ShardSplitParams {
|
||||
old_shard_count,
|
||||
new_shard_count,
|
||||
new_stripe_size,
|
||||
targets,
|
||||
policy,
|
||||
shard_ident,
|
||||
} = params;
|
||||
|
||||
// Before creating any new child shards in memory or on the pageservers, persist them: this
|
||||
// enables us to ensure that we will always be able to clean up if something goes wrong. This also
|
||||
// acts as the protection against two concurrent attempts to split: one of them will get a database
|
||||
@@ -3125,7 +3210,7 @@ impl Service {
|
||||
// N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
|
||||
|
||||
for target in &targets {
|
||||
let SplitTarget {
|
||||
let ShardSplitTarget {
|
||||
parent_id,
|
||||
node,
|
||||
child_ids,
|
||||
@@ -3135,8 +3220,8 @@ impl Service {
|
||||
.tenant_shard_split(
|
||||
*parent_id,
|
||||
TenantShardSplitRequest {
|
||||
new_shard_count: split_req.new_shard_count,
|
||||
new_stripe_size: split_req.new_stripe_size,
|
||||
new_shard_count: new_shard_count.literal(),
|
||||
new_stripe_size,
|
||||
},
|
||||
)
|
||||
.await
|
||||
@@ -3185,11 +3270,8 @@ impl Service {
|
||||
));
|
||||
|
||||
// Replace all the shards we just split with their children: this phase is infallible.
|
||||
let (response, child_locations) = self.tenant_shard_split_commit_inmem(
|
||||
tenant_id,
|
||||
ShardCount::new(split_req.new_shard_count),
|
||||
split_req.new_stripe_size,
|
||||
);
|
||||
let (response, child_locations) =
|
||||
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
|
||||
|
||||
// Send compute notifications for all the new shards
|
||||
let mut failed_notifications = Vec::new();
|
||||
@@ -3254,17 +3336,15 @@ impl Service {
|
||||
let old_attached = *shard.intent.get_attached();
|
||||
|
||||
match shard.policy {
|
||||
PlacementPolicy::Single => {
|
||||
shard.intent.clear_secondary(scheduler);
|
||||
shard.intent.set_attached(scheduler, Some(migrate_req.node_id));
|
||||
}
|
||||
PlacementPolicy::Double(_n) => {
|
||||
PlacementPolicy::Attached(n) => {
|
||||
// If our new attached node was a secondary, it no longer should be.
|
||||
shard.intent.remove_secondary(scheduler, migrate_req.node_id);
|
||||
|
||||
// If we were already attached to something, demote that to a secondary
|
||||
if let Some(old_attached) = old_attached {
|
||||
shard.intent.push_secondary(scheduler, old_attached);
|
||||
if n > 0 {
|
||||
shard.intent.push_secondary(scheduler, old_attached);
|
||||
}
|
||||
}
|
||||
|
||||
shard.intent.set_attached(scheduler, Some(migrate_req.node_id));
|
||||
|
||||
@@ -457,22 +457,7 @@ impl TenantState {
|
||||
// Add/remove nodes to fulfil policy
|
||||
use PlacementPolicy::*;
|
||||
match self.policy {
|
||||
Single => {
|
||||
// Should have exactly one attached, and zero secondaries
|
||||
if !self.intent.secondary.is_empty() {
|
||||
self.intent.clear_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
|
||||
let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
|
||||
modified |= modified_attached;
|
||||
|
||||
if !self.intent.secondary.is_empty() {
|
||||
self.intent.clear_secondary(scheduler);
|
||||
modified = true;
|
||||
}
|
||||
}
|
||||
Double(secondary_count) => {
|
||||
Attached(secondary_count) => {
|
||||
let retain_secondaries = if self.intent.attached.is_none()
|
||||
&& scheduler.node_preferred(&self.intent.secondary).is_some()
|
||||
{
|
||||
@@ -895,7 +880,7 @@ pub(crate) mod tests {
|
||||
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
|
||||
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
tenant_state
|
||||
.schedule(&mut scheduler)
|
||||
.expect("we have enough nodes, scheduling should work");
|
||||
@@ -943,7 +928,7 @@ pub(crate) mod tests {
|
||||
let nodes = make_test_nodes(3);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
|
||||
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
|
||||
tenant_state.observed.locations.insert(
|
||||
NodeId(3),
|
||||
|
||||
@@ -437,7 +437,7 @@ async fn handle_tenant(
|
||||
|
||||
let placement_policy = match create_match.get_one::<String>("placement-policy") {
|
||||
Some(s) if !s.is_empty() => serde_json::from_str::<PlacementPolicy>(s)?,
|
||||
_ => PlacementPolicy::Single,
|
||||
_ => PlacementPolicy::Attached(0),
|
||||
};
|
||||
|
||||
let tenant_conf = PageServerNode::parse_config(tenant_conf)?;
|
||||
@@ -523,88 +523,6 @@ async fn handle_tenant(
|
||||
.with_context(|| format!("Tenant config failed for tenant with id {tenant_id}"))?;
|
||||
println!("tenant {tenant_id} successfully configured on the pageserver");
|
||||
}
|
||||
Some(("migrate", matches)) => {
|
||||
let tenant_shard_id = get_tenant_shard_id(matches, env)?;
|
||||
let new_pageserver = get_pageserver(env, matches)?;
|
||||
let new_pageserver_id = new_pageserver.conf.id;
|
||||
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.tenant_migrate(tenant_shard_id, new_pageserver_id)
|
||||
.await?;
|
||||
|
||||
println!("tenant {tenant_shard_id} migrated to {}", new_pageserver_id);
|
||||
}
|
||||
Some(("status", matches)) => {
|
||||
let tenant_id = get_tenant_id(matches, env)?;
|
||||
|
||||
let mut shard_table = comfy_table::Table::new();
|
||||
shard_table.set_header(["Shard", "Pageserver", "Physical Size"]);
|
||||
|
||||
let mut tenant_synthetic_size = None;
|
||||
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
for shard in storage_controller.tenant_locate(tenant_id).await?.shards {
|
||||
let pageserver =
|
||||
PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?);
|
||||
|
||||
let size = pageserver
|
||||
.http_client
|
||||
.tenant_details(shard.shard_id)
|
||||
.await?
|
||||
.tenant_info
|
||||
.current_physical_size
|
||||
.unwrap();
|
||||
|
||||
shard_table.add_row([
|
||||
format!("{}", shard.shard_id.shard_slug()),
|
||||
format!("{}", shard.node_id.0),
|
||||
format!("{} MiB", size / (1024 * 1024)),
|
||||
]);
|
||||
|
||||
if shard.shard_id.is_zero() {
|
||||
tenant_synthetic_size =
|
||||
Some(pageserver.tenant_synthetic_size(shard.shard_id).await?);
|
||||
}
|
||||
}
|
||||
|
||||
let Some(synthetic_size) = tenant_synthetic_size else {
|
||||
bail!("Shard 0 not found")
|
||||
};
|
||||
|
||||
let mut tenant_table = comfy_table::Table::new();
|
||||
tenant_table.add_row(["Tenant ID".to_string(), tenant_id.to_string()]);
|
||||
tenant_table.add_row([
|
||||
"Synthetic size".to_string(),
|
||||
format!("{} MiB", synthetic_size.size.unwrap_or(0) / (1024 * 1024)),
|
||||
]);
|
||||
|
||||
println!("{tenant_table}");
|
||||
println!("{shard_table}");
|
||||
}
|
||||
Some(("shard-split", matches)) => {
|
||||
let tenant_id = get_tenant_id(matches, env)?;
|
||||
let shard_count: u8 = matches.get_one::<u8>("shard-count").cloned().unwrap_or(0);
|
||||
let shard_stripe_size: Option<ShardStripeSize> = matches
|
||||
.get_one::<Option<ShardStripeSize>>("shard-stripe-size")
|
||||
.cloned()
|
||||
.unwrap();
|
||||
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let result = storage_controller
|
||||
.tenant_split(tenant_id, shard_count, shard_stripe_size)
|
||||
.await?;
|
||||
println!(
|
||||
"Split tenant {} into shards {}",
|
||||
tenant_id,
|
||||
result
|
||||
.new_shards
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
}
|
||||
|
||||
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
|
||||
None => bail!("no tenant subcommand provided"),
|
||||
@@ -1578,19 +1496,6 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("config")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(Arg::new("config").short('c').num_args(1).action(ArgAction::Append).required(false)))
|
||||
.subcommand(Command::new("migrate")
|
||||
.about("Migrate a tenant from one pageserver to another")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(pageserver_id_arg.clone()))
|
||||
.subcommand(Command::new("status")
|
||||
.about("Human readable summary of the tenant's shards and attachment locations")
|
||||
.arg(tenant_id_arg.clone()))
|
||||
.subcommand(Command::new("shard-split")
|
||||
.about("Increase the number of shards in the tenant")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
|
||||
.arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages"))
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("pageserver")
|
||||
|
||||
@@ -475,7 +475,7 @@ impl StorageController {
|
||||
pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
|
||||
self.dispatch::<(), _>(
|
||||
Method::GET,
|
||||
format!("control/v1/tenant/{tenant_id}/locate"),
|
||||
format!("debug/v1/tenant/{tenant_id}/locate"),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -6,7 +6,10 @@ use std::str::FromStr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::{models::ShardParameters, shard::TenantShardId};
|
||||
use crate::{
|
||||
models::{ShardParameters, TenantConfig},
|
||||
shard::{ShardStripeSize, TenantShardId},
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantCreateResponseShard {
|
||||
@@ -57,6 +60,31 @@ pub struct TenantLocateResponse {
|
||||
pub shard_params: ShardParameters,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantDescribeResponse {
|
||||
pub shards: Vec<TenantDescribeResponseShard>,
|
||||
pub stripe_size: ShardStripeSize,
|
||||
pub policy: PlacementPolicy,
|
||||
pub config: TenantConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantDescribeResponseShard {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
|
||||
pub node_attached: Option<NodeId>,
|
||||
pub node_secondary: Vec<NodeId>,
|
||||
|
||||
pub last_error: String,
|
||||
|
||||
/// A task is currently running to reconcile this tenant's intent state with the state on pageservers
|
||||
pub is_reconciling: bool,
|
||||
/// This shard failed in sending a compute notification to the cloud control plane, and a retry is pending.
|
||||
pub is_pending_compute_notification: bool,
|
||||
/// A shard split is currently underway
|
||||
pub is_splitting: bool,
|
||||
}
|
||||
|
||||
/// Explicitly migrating a particular shard is a low level operation
|
||||
/// TODO: higher level "Reschedule tenant" operation where the request
|
||||
/// specifies some constraints, e.g. asking it to get off particular node(s)
|
||||
@@ -181,11 +209,8 @@ impl From<NodeSchedulingPolicy> for String {
|
||||
/// to create secondary locations.
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||
pub enum PlacementPolicy {
|
||||
/// Cheapest way to attach a tenant: just one pageserver, no secondary
|
||||
Single,
|
||||
/// Production-ready way to attach a tenant: one attached pageserver and
|
||||
/// some number of secondaries.
|
||||
Double(usize),
|
||||
/// Normal live state: one attached pageserver and zero or more secondaries.
|
||||
Attached(usize),
|
||||
/// Create one secondary mode locations. This is useful when onboarding
|
||||
/// a tenant, or for an idle tenant that we might want to bring online quickly.
|
||||
Secondary,
|
||||
@@ -207,14 +232,14 @@ mod test {
|
||||
/// Check stability of PlacementPolicy's serialization
|
||||
#[test]
|
||||
fn placement_policy_encoding() -> anyhow::Result<()> {
|
||||
let v = PlacementPolicy::Double(1);
|
||||
let v = PlacementPolicy::Attached(1);
|
||||
let encoded = serde_json::to_string(&v)?;
|
||||
assert_eq!(encoded, "{\"Double\":1}");
|
||||
assert_eq!(encoded, "{\"Attached\":1}");
|
||||
assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
|
||||
|
||||
let v = PlacementPolicy::Single;
|
||||
let v = PlacementPolicy::Detached;
|
||||
let encoded = serde_json::to_string(&v)?;
|
||||
assert_eq!(encoded, "\"Single\"");
|
||||
assert_eq!(encoded, "\"Detached\"");
|
||||
assert_eq!(serde_json::from_str::<PlacementPolicy>(&encoded)?, v);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1892,19 +1892,6 @@ class NeonCli(AbstractNeonCli):
|
||||
|
||||
return self.raw_cli(args, check_return_code=True)
|
||||
|
||||
def tenant_migrate(
|
||||
self, tenant_shard_id: TenantShardId, new_pageserver: int, timeout_secs: Optional[int]
|
||||
):
|
||||
args = [
|
||||
"tenant",
|
||||
"migrate",
|
||||
"--tenant-id",
|
||||
str(tenant_shard_id),
|
||||
"--id",
|
||||
str(new_pageserver),
|
||||
]
|
||||
return self.raw_cli(args, check_return_code=True, timeout=timeout_secs)
|
||||
|
||||
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
|
||||
return self.raw_cli(["start"], check_return_code=check_return_code)
|
||||
|
||||
@@ -2156,7 +2143,7 @@ class NeonStorageController(MetricsGetter):
|
||||
"""
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/locate",
|
||||
f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/locate",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
body = response.json()
|
||||
|
||||
@@ -158,6 +158,9 @@ class TenantShardId:
|
||||
def __str__(self):
|
||||
return f"{self.tenant_id}-{self.shard_number:02x}{self.shard_count:02x}"
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def _tuple(self) -> tuple[TenantId, int, int]:
|
||||
return (self.tenant_id, self.shard_number, self.shard_count)
|
||||
|
||||
|
||||
@@ -576,7 +576,7 @@ def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controll
|
||||
timeline_id = TimelineId.generate()
|
||||
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Double":1}'
|
||||
tenant_id, timeline_id, conf=TENANT_CONF, placement_policy='{"Attached":1}'
|
||||
)
|
||||
|
||||
attached_to_id = env.storage_controller.locate(tenant_id)[0]["node_id"]
|
||||
|
||||
@@ -264,7 +264,7 @@ def test_sharding_split_smoke(
|
||||
destination = migrate_to_pageserver_ids.pop()
|
||||
|
||||
log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
|
||||
env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10)
|
||||
env.storage_controller.tenant_shard_migrate(migrate_shard, destination)
|
||||
|
||||
workload.validate()
|
||||
|
||||
@@ -299,7 +299,7 @@ def test_sharding_split_smoke(
|
||||
locations = pageserver.http_client().tenant_list_locations()
|
||||
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
|
||||
|
||||
log.info("Shards after split: {shards_exist}")
|
||||
log.info(f"Shards after split: {shards_exist}")
|
||||
assert len(shards_exist) == split_shard_count
|
||||
|
||||
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
|
||||
|
||||
Reference in New Issue
Block a user