mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-05 21:20:37 +00:00
control plane: improve handling of stripe size
This commit is contained in:
@@ -3,7 +3,10 @@ use anyhow::anyhow;
|
||||
use camino::Utf8PathBuf;
|
||||
use hyper::{Method, StatusCode};
|
||||
use pageserver_api::{
|
||||
models::{TenantCreateRequest, TenantShardSplitRequest, TimelineCreateRequest, TimelineInfo},
|
||||
models::{
|
||||
ShardParameters, TenantCreateRequest, TenantShardSplitRequest, TimelineCreateRequest,
|
||||
TimelineInfo,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
};
|
||||
use postgres_connection::parse_host_port;
|
||||
@@ -79,6 +82,7 @@ pub struct TenantLocateResponseShard {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantLocateResponse {
|
||||
pub shards: Vec<TenantLocateResponseShard>,
|
||||
pub shard_params: ShardParameters,
|
||||
}
|
||||
|
||||
impl AttachmentService {
|
||||
|
||||
@@ -9,7 +9,7 @@ use clap::Parser;
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::{Method, StatusCode};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, TenantConfig, TenantCreateRequest,
|
||||
LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
|
||||
TenantLocationConfigRequest, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
@@ -514,8 +514,9 @@ async fn handle_tenant_create(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
let mut locked = state.write().await;
|
||||
|
||||
tracing::info!(
|
||||
"Creating tenant {}, have {} pageservers",
|
||||
"Creating tenant {}, shard_count={:?}, have {} pageservers",
|
||||
create_req.new_tenant_id,
|
||||
create_req.shard_parameters.count,
|
||||
locked.pageservers.len()
|
||||
);
|
||||
|
||||
@@ -548,6 +549,7 @@ async fn handle_tenant_create(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
shard_number,
|
||||
shard_count: create_req.shard_parameters.count,
|
||||
};
|
||||
tracing::info!("Creating shard {tenant_shard_id}...");
|
||||
|
||||
use std::collections::btree_map::Entry;
|
||||
match locked.tenants.entry(tenant_shard_id) {
|
||||
@@ -685,6 +687,7 @@ async fn handle_tenant_locate(req: Request<Body>) -> Result<Response<Body>, ApiE
|
||||
let pageservers = locked.pageservers.clone();
|
||||
|
||||
let mut result = Vec::new();
|
||||
let mut shard_params: Option<ShardParameters> = None;
|
||||
|
||||
for (tenant_shard_id, shard) in locked
|
||||
.tenants
|
||||
@@ -708,9 +711,40 @@ async fn handle_tenant_locate(req: Request<Body>) -> Result<Response<Body>, ApiE
|
||||
listen_pg_addr: node.listen_pg_addr.clone(),
|
||||
listen_pg_port: node.listen_pg_port,
|
||||
});
|
||||
|
||||
match &shard_params {
|
||||
None => {
|
||||
shard_params = Some(ShardParameters {
|
||||
stripe_size: Some(shard.shard.stripe_size),
|
||||
count: shard.shard.count,
|
||||
});
|
||||
}
|
||||
Some(params) => {
|
||||
if params.stripe_size != Some(shard.shard.stripe_size) {
|
||||
// This should never happen. We enforce at runtime because it's simpler than
|
||||
// adding an extra per-tenant data structure to store the things that should be the same
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Inconsistent shard stripe size parameters!"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, TenantLocateResponse { shards: result })
|
||||
if result.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("No shards for this tenant ID found").into(),
|
||||
));
|
||||
}
|
||||
let shard_params = shard_params.expect("result is non-empty, therefore this is set");
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
TenantLocateResponse {
|
||||
shards: result,
|
||||
shard_params,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -774,7 +808,7 @@ async fn handle_tenant_shard_split(mut req: Request<Body>) -> Result<Response<Bo
|
||||
let client = Client::new();
|
||||
let response = client
|
||||
.request(
|
||||
Method::POST,
|
||||
Method::PUT,
|
||||
format!("{}/tenant/{}/shard_split", node.base_url(), tenant_shard_id),
|
||||
)
|
||||
.json(&TenantShardSplitRequest {
|
||||
@@ -788,6 +822,9 @@ async fn handle_tenant_shard_split(mut req: Request<Body>) -> Result<Response<Bo
|
||||
// response.error_for_status().map_err(|e| {
|
||||
// ApiError::Conflict(format!("Failed to split {}: {}", tenant_shard_id, e))
|
||||
// })?;
|
||||
response.error_for_status_ref().map_err(|e| {
|
||||
ApiError::Conflict(format!("Failed to split {}: {}", tenant_shard_id, e))
|
||||
})?;
|
||||
let response: TenantShardSplitResponse = response.json().await.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Malformed response from pageserver: {}",
|
||||
@@ -799,6 +836,9 @@ async fn handle_tenant_shard_split(mut req: Request<Body>) -> Result<Response<Bo
|
||||
}
|
||||
|
||||
// Replace all the shards we just split with their children
|
||||
let mut response = TenantShardSplitResponse {
|
||||
new_shards: Vec::new(),
|
||||
};
|
||||
for (replaced, children) in replacements.into_iter() {
|
||||
let (pageserver, generation, shard_ident, config) = {
|
||||
let old_state = locked
|
||||
@@ -847,10 +887,12 @@ async fn handle_tenant_shard_split(mut req: Request<Body>) -> Result<Response<Bo
|
||||
config: config.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
response.new_shards.push(child);
|
||||
}
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Status endpoint is just used for checking that our HTTP listener is up
|
||||
|
||||
@@ -427,7 +427,7 @@ async fn handle_tenant(
|
||||
let shard_count: u8 = create_match
|
||||
.get_one::<u8>("shard-count")
|
||||
.cloned()
|
||||
.unwrap_or(1);
|
||||
.unwrap_or(0);
|
||||
|
||||
let shard_stripe_size: Option<u32> =
|
||||
create_match.get_one::<u32>("shard-stripe-size").cloned();
|
||||
@@ -909,9 +909,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
)?;
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
let pageservers = attachment_service
|
||||
.tenant_locate(endpoint.tenant_id)
|
||||
.await?
|
||||
let locate_result = attachment_service.tenant_locate(endpoint.tenant_id).await?;
|
||||
let pageservers = locate_result
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
@@ -922,6 +921,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let stripe_size = locate_result.shard_params.stripe_size.map(|s| s.0 as usize);
|
||||
|
||||
let ps_conf = env.get_pageserver_conf(pageserver_id)?;
|
||||
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
|
||||
@@ -934,7 +934,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint
|
||||
.start(&auth_token, safekeepers, pageservers, remote_ext_config)
|
||||
.start(
|
||||
&auth_token,
|
||||
safekeepers,
|
||||
pageservers,
|
||||
remote_ext_config,
|
||||
stripe_size,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
"reconfigure" => {
|
||||
|
||||
@@ -466,6 +466,7 @@ impl Endpoint {
|
||||
safekeepers: Vec<NodeId>,
|
||||
pageservers: Vec<(Host, u16)>,
|
||||
remote_ext_config: Option<&String>,
|
||||
shard_stripe_size: Option<usize>,
|
||||
) -> Result<()> {
|
||||
if self.status() == "running" {
|
||||
anyhow::bail!("The endpoint is already running");
|
||||
@@ -529,6 +530,7 @@ impl Endpoint {
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions,
|
||||
shard_stripe_size,
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
Reference in New Issue
Block a user