diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 971a6aa4da..5e88231065 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -2,7 +2,7 @@ use crate::reconciler::ReconcileError; use crate::service::Service; use hyper::StatusCode; use hyper::{Body, Request, Response}; -use pageserver_api::models::{TenantCreateRequest, TimelineCreateRequest}; +use pageserver_api::models::{TenantCreateRequest, TenantShardSplitRequest, TimelineCreateRequest}; use pageserver_api::shard::TenantShardId; use std::sync::Arc; use utils::http::endpoint::request_span; @@ -129,6 +129,20 @@ async fn handle_node_configure(mut req: Request) -> Result, json_response(StatusCode::OK, state.service.node_configure(config_req)?) } +async fn handle_tenant_shard_split(mut req: Request) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let split_req = json_request::(&mut req).await?; + let state = get_state(&req); + + json_response( + StatusCode::OK, + state + .service + .tenant_shard_split(tenant_id, split_req) + .await?, + ) +} + async fn handle_tenant_shard_migrate(mut req: Request) -> Result, ApiError> { let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; let migrate_req = json_request::(&mut req).await?; @@ -172,6 +186,9 @@ pub fn make_router(service: Arc) -> RouterBuilder Result { + let mut policy = None; + let (targets, compute_hook) = { + let mut locked = self.inner.write().unwrap(); + + let pageservers = locked.nodes.clone(); + + let mut targets = Vec::new(); + + for (tenant_shard_id, shard) in locked + .tenants + .range_mut(TenantShardId::tenant_range(tenant_id)) + { + if policy.is_none() { + policy = Some(shard.policy.clone()); + } + + if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) { + tracing::warn!( + "Tenant shard {} already has shard count {}", + tenant_shard_id, + split_req.new_shard_count + ); + continue; + } + + let node_id = + shard + .intent + .attached + .ok_or(ApiError::BadRequest(anyhow::anyhow!( + "Cannot split a tenant that is not attached" + )))?; + + let node = pageservers + .get(&node_id) + .expect("Pageservers may not be deleted while referenced"); + + // TODO: if any reconciliation is currently in progress for this shard, wait for it. + + targets.push((*tenant_shard_id, node.clone())); + } + (targets, locked.compute_hook.clone()) + }; + + let mut replacements = HashMap::new(); + for (tenant_shard_id, node) in targets { + let client = Client::new(); + let response = client + .request( + Method::PUT, + format!("{}/tenant/{}/shard_split", node.base_url(), tenant_shard_id), + ) + .json(&TenantShardSplitRequest { + new_shard_count: split_req.new_shard_count, + }) + .send() + .await + .map_err(|e| { + ApiError::Conflict(format!("Failed to split {}: {}", tenant_shard_id, e)) + })?; + response.error_for_status_ref().map_err(|e| { + ApiError::Conflict(format!("Failed to split {}: {}", tenant_shard_id, e)) + })?; + let response: TenantShardSplitResponse = response.json().await.map_err(|e| { + ApiError::InternalServerError(anyhow::anyhow!( + "Malformed response from pageserver: {}", + e + )) + })?; + + tracing::info!( + "Split {} into {}", + tenant_shard_id, + response + .new_shards + .iter() + .map(|s| format!("{:?}", s)) + .collect::>() + .join(",") + ); + + replacements.insert(tenant_shard_id, response.new_shards); + } + + // TODO: concurrency: we're dropping the state lock while issuing split API calls. + // We should add some marker to the TenantState that causes any other change + // to refuse until the split is complete. This will be related to a persistent + // splitting marker that will ensure resume after crash. + + // Replace all the shards we just split with their children + let mut response = TenantShardSplitResponse { + new_shards: Vec::new(), + }; + let mut child_locations = Vec::new(); + { + let mut locked = self.inner.write().unwrap(); + for (replaced, children) in replacements.into_iter() { + let (pageserver, generation, shard_ident, config) = { + let old_state = locked + .tenants + .remove(&replaced) + .expect("It was present, we just split it"); + ( + old_state.intent.attached.unwrap(), + old_state.generation, + old_state.shard, + old_state.config.clone(), + ) + }; + + locked.tenants.remove(&replaced); + + for child in children { + let mut child_shard = shard_ident; + child_shard.number = child.shard_number; + child_shard.count = child.shard_count; + + let mut child_observed: HashMap = HashMap::new(); + child_observed.insert( + pageserver, + ObservedStateLocation { + conf: Some(attached_location_conf(generation, &child_shard, &config)), + }, + ); + + let mut child_state = TenantState::new( + child, + child_shard, + policy + .clone() + .expect("We set this if any replacements are pushed"), + ); + child_state.intent = IntentState::single(Some(pageserver)); + child_state.observed = ObservedState { + locations: child_observed, + }; + child_state.generation = generation; + child_state.config = config.clone(); + + child_locations.push((child, pageserver)); + + locked.tenants.insert(child, child_state); + response.new_shards.push(child); + } + } + } + + for (child_id, child_ps) in child_locations { + if let Err(e) = compute_hook.notify(child_id, child_ps).await { + tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})", + child_id, child_ps); + } + } + Ok(response) + } + pub(crate) async fn tenant_shard_migrate( &self, tenant_shard_id: TenantShardId, diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 81f8a9f47a..bc66e804fa 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -133,6 +133,13 @@ impl IntentState { result } + pub(crate) fn single(node_id: Option) -> Self { + Self { + attached: node_id, + secondary: vec![], + } + } + /// When a node goes offline, we update intents to avoid using it /// as their attached pageserver. /// diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index ef89711a5a..53125a39e9 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -3,7 +3,10 @@ use anyhow::anyhow; use camino::Utf8PathBuf; use hyper::{Method, StatusCode}; use pageserver_api::{ - models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo}, + models::{ + ShardParameters, TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse, + TimelineCreateRequest, TimelineInfo, + }, shard::TenantShardId, }; use postgres_connection::parse_host_port; @@ -344,6 +347,20 @@ impl AttachmentService { .await } + #[instrument(skip(self), fields(%tenant_id, %new_shard_count))] + pub async fn tenant_split( + &self, + tenant_id: TenantId, + new_shard_count: u8, + ) -> anyhow::Result { + self.dispatch( + Method::PUT, + format!("tenant/{tenant_id}/shard_split"), + Some(TenantShardSplitRequest { new_shard_count }), + ) + .await + } + #[instrument(skip_all, fields(node_id=%req.node_id))] pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req)) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 329a06780b..295469882a 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -572,6 +572,26 @@ async fn handle_tenant( println!("{tenant_table}"); println!("{shard_table}"); } + Some(("shard-split", matches)) => { + let tenant_id = get_tenant_id(matches, env)?; + let shard_count: u8 = matches.get_one::("shard-count").cloned().unwrap_or(0); + + let attachment_service = AttachmentService::from_env(env); + let result = attachment_service + .tenant_split(tenant_id, shard_count) + .await?; + println!( + "Split tenant {} into shards {}", + tenant_id, + result + .new_shards + .iter() + .map(|s| format!("{:?}", s)) + .collect::>() + .join(",") + ); + } + Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), } @@ -1514,6 +1534,11 @@ fn cli() -> Command { .subcommand(Command::new("status") .about("Human readable summary of the tenant's shards and attachment locations") .arg(tenant_id_arg.clone())) + .subcommand(Command::new("shard-split") + .about("Increase the number of shards in the tenant") + .arg(tenant_id_arg.clone()) + .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)")) + ) ) .subcommand( Command::new("pageserver")