diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs
index 56e3fac565..0ce8f6194b 100644
--- a/control_plane/src/attachment_service.rs
+++ b/control_plane/src/attachment_service.rs
@@ -3,7 +3,7 @@ use anyhow::anyhow;
use camino::Utf8PathBuf;
use hyper::{Method, StatusCode};
use pageserver_api::{
- models::{TenantCreateRequest, TimelineCreateRequest},
+ models::{TenantCreateRequest, TenantShardSplitRequest, TimelineCreateRequest},
shard::TenantShardId,
};
use postgres_connection::parse_host_port;
@@ -247,6 +247,15 @@ impl AttachmentService {
self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None)
}
+ #[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
+ pub fn tenant_split(&self, tenant_id: TenantId, new_shard_count: u8) -> anyhow::Result<()> {
+ self.dispatch::<_, ()>(
+ Method::POST,
+ format!("tenant/{tenant_id}/shard_split"),
+ Some(TenantShardSplitRequest { new_shard_count }),
+ )
+ }
+
#[instrument(skip_all, fields(node_id=%req.node_id))]
pub fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req))
diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs
index 27fde0aab2..4a9b461a9e 100644
--- a/control_plane/src/bin/attachment_service.rs
+++ b/control_plane/src/bin/attachment_service.rs
@@ -10,7 +10,8 @@ use hyper::{Body, Request, Response};
use hyper::{Method, StatusCode};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, TenantConfig, TenantCreateRequest,
- TenantLocationConfigRequest, TimelineCreateRequest,
+ TenantLocationConfigRequest, TenantShardSplitRequest, TenantShardSplitResponse,
+ TimelineCreateRequest,
};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
use reqwest::Client;
@@ -722,6 +723,121 @@ async fn handle_node_register(mut req: Request
) -> Result,
json_response(StatusCode::OK, ())
}
+async fn handle_tenant_shard_split(mut req: Request) -> Result, ApiError> {
+ let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+ let split_req = json_request::(&mut req).await?;
+ let state = get_state(&req).inner.clone();
+ let mut locked = state.write().await;
+
+ let pageservers = locked.pageservers.clone();
+
+ let mut replacements = HashMap::new();
+
+ for (tenant_shard_id, shard) in locked
+ .tenants
+ .range_mut(TenantShardId::tenant_range(tenant_id))
+ {
+ if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) {
+ tracing::warn!(
+ "Tenant shard {} already has shard count {}",
+ tenant_shard_id,
+ split_req.new_shard_count
+ );
+ continue;
+ }
+
+ let node_id = shard
+ .pageserver
+ .ok_or(ApiError::BadRequest(anyhow::anyhow!(
+ "Cannot split a tenant that is not attached"
+ )))?;
+
+ let node = pageservers
+ .get(&node_id)
+ .expect("Pageservers may not be deleted while referenced");
+
+ let client = Client::new();
+ let response = client
+ .request(
+ Method::POST,
+ format!("{}/tenant/{}/shard_split", node.base_url(), tenant_shard_id),
+ )
+ .json(&TenantShardSplitRequest {
+ new_shard_count: split_req.new_shard_count,
+ })
+ .send()
+ .await
+ .map_err(|e| {
+ ApiError::Conflict(format!("Failed to split {}: {}", tenant_shard_id, e))
+ })?;
+ // response.error_for_status().map_err(|e| {
+ // ApiError::Conflict(format!("Failed to split {}: {}", tenant_shard_id, e))
+ // })?;
+ let response: TenantShardSplitResponse = response.json().await.map_err(|e| {
+ ApiError::InternalServerError(anyhow::anyhow!(
+ "Malformed response from pageserver: {}",
+ e
+ ))
+ })?;
+
+ replacements.insert(*tenant_shard_id, response.new_shards);
+ }
+
+ // Replace all the shards we just split with their children
+ for (replaced, children) in replacements.into_iter() {
+ let (pageserver, generation, shard_ident, config) = {
+ let old_state = locked
+ .tenants
+ .remove(&replaced)
+ .expect("It was present, we just split it");
+ (
+ old_state.pageserver.unwrap(),
+ old_state.generation,
+ old_state.shard,
+ old_state.config.clone(),
+ )
+ };
+
+ for child in children {
+ let mut child_shard = shard_ident;
+ child_shard.number = child.shard_number;
+ child_shard.count = child.shard_count;
+
+ let mut child_observed: HashMap = HashMap::new();
+ child_observed.insert(
+ pageserver,
+ ObservedStateLocation {
+ conf: Some(LocationConfig {
+ mode: LocationConfigMode::AttachedSingle,
+ generation: Some(generation),
+ secondary_conf: None,
+ shard_number: child.shard_number.0,
+ shard_count: child.shard_count.0,
+ shard_stripe_size: shard_ident.stripe_size.0,
+ tenant_conf: config.clone(),
+ }),
+ },
+ );
+
+ locked.tenants.insert(
+ child,
+ TenantState {
+ tenant_shard_id: child,
+ shard: child_shard,
+ pageserver: Some(pageserver),
+ generation: generation,
+ observed: ObservedState {
+ locations: child_observed,
+ },
+ config: config.clone(),
+ },
+ );
+ }
+ }
+
+ json_response(StatusCode::OK, ())
+}
+
/// Status endpoint is just used for checking that our HTTP listener is up
async fn handle_status(_req: Request) -> Result, ApiError> {
json_response(StatusCode::OK, ())
@@ -743,6 +859,9 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder {
+ let tenant_id = get_tenant_id(matches, env)?;
+ let attachment_service = AttachmentService::from_env(env);
+ let old_shards = attachment_service.tenant_locate(tenant_id)?.shards;
+ let new_shard_count = old_shards.len() * 2;
+ if old_shards.len() > 127 {
+ bail!("Cannot split further");
+ }
+
+ attachment_service.tenant_split(tenant_id, new_shard_count as u8)?;
+ println!("Split {}->{}", old_shards.len(), new_shard_count);
+ }
Some(("status", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;
diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs
index 68019c0a1e..e50ae605d6 100644
--- a/libs/pageserver_api/src/models.rs
+++ b/libs/pageserver_api/src/models.rs
@@ -191,6 +191,16 @@ pub struct TimelineCreateRequest {
pub pg_version: Option,
}
+#[derive(Serialize, Deserialize)]
+pub struct TenantShardSplitRequest {
+ pub new_shard_count: u8,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct TenantShardSplitResponse {
+ pub new_shards: Vec,
+}
+
/// Parameters that apply to all shards in a tenant. Used during tenant creation.
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs
index 75cb319f21..12f0811ae3 100644
--- a/libs/pageserver_api/src/shard.rs
+++ b/libs/pageserver_api/src/shard.rs
@@ -88,6 +88,16 @@ impl TenantShardId {
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
+
+ /// Convenience for dropping the tenant_id and just getting the ShardIndex: this
+ /// is useful when logging from code that is already in a span that includes tenant ID, to
+ /// keep messages reasonably terse.
+ pub fn to_index(&self) -> ShardIndex {
+ ShardIndex {
+ shard_number: self.shard_number,
+ shard_count: self.shard_count,
+ }
+ }
}
/// Formatting helper
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 0b4f9538e0..0c2f148b86 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -15,10 +15,13 @@ use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::TenantDetails;
+use pageserver_api::models::TenantShardSplitRequest;
+use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
+use pageserver_api::shard::ShardCount;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
@@ -986,6 +989,25 @@ async fn tenant_size_handler(
)
}
+async fn tenant_shard_split_handler(
+ mut request: Request,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
+ let req: TenantShardSplitRequest = json_request(&mut request).await?;
+
+ let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
+ let state = get_state(&request);
+ let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
+
+ let new_shards = state
+ .tenant_manager
+ .shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx)
+ .await
+ .map_err(ApiError::InternalServerError)?;
+
+ json_response(StatusCode::OK, TenantShardSplitResponse { new_shards })
+}
+
async fn layer_map_info_handler(
request: Request,
_cancel: CancellationToken,
@@ -1824,6 +1846,9 @@ pub fn make_router(
.put("/v1/tenant/config", |r| {
api_handler(r, update_tenant_config_handler)
})
+ .put("/v1/tenant/:tenant_shard_id/shard_split", |r| {
+ api_handler(r, tenant_shard_split_handler)
+ })
.get("/v1/tenant/:tenant_shard_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs
index 3dc7226e48..f0378499e7 100644
--- a/pageserver/src/tenant.rs
+++ b/pageserver/src/tenant.rs
@@ -50,6 +50,7 @@ use self::metadata::TimelineMetadata;
use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::mgr::TenantsMap;
+use self::remote_timeline_client::upload::upload_index_part;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::TimelineUninitMark;
@@ -2305,6 +2306,66 @@ impl Tenant {
pub(crate) fn get_generation(&self) -> Generation {
self.generation
}
+
+ pub(crate) async fn split_prepare(
+ &self,
+ child_shards: &Vec,
+ ) -> anyhow::Result<()> {
+ let timelines = self.timelines.lock().unwrap().clone();
+ for timeline in timelines.values() {
+ let Some(tl_client) = &timeline.remote_client else {
+ anyhow::bail!("Remote storage is mandatory");
+ };
+
+ let Some(remote_storage) = &self.remote_storage else {
+ anyhow::bail!("Remote storage is mandatory");
+ };
+
+ // TODO: some higher level should enforce that timeline creation/deletion does not
+ // happen concurrently with splits. This is impossible to safely coordinate locally
+ // within one single pageserver's view of the world.
+
+ // Upload an index from the parent: this is partly to provide freshness for the
+ // child tenants that will copy it, and partly for general ease-of-debugging: there will
+ // always be a parent shard index in the same generation as we wrote the child shard index.
+ tl_client.schedule_index_upload_for_file_changes()?;
+ tl_client.wait_completion().await?;
+
+ // Shut down the timeline's remote client: this means that the indices we write
+ // for child shards will not be invalidated by the parent shard deleting layers.
+ tl_client.shutdown().await?;
+
+ // Download methods can still be used after shutdown, as they don't flow through the remote client's
+ // queue.
+ // TODO: create a way for remote timeline client to give us a copy of the last IndexPart it uploaded
+ // without having to download it again.
+ // TODO: carry a cancellation token in here
+ let result = tl_client
+ .download_index_file(CancellationToken::new())
+ .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
+ .await?;
+ let index_part = match result {
+ MaybeDeletedIndexPart::Deleted(_) => {
+ anyhow::bail!("Timeline deletion happened concurrently with split")
+ }
+ MaybeDeletedIndexPart::IndexPart(p) => p,
+ };
+
+ for child_shard in child_shards {
+ upload_index_part(
+ &remote_storage,
+ child_shard,
+ &timeline.timeline_id,
+ self.generation,
+ &index_part,
+ &self.cancel,
+ )
+ .await?;
+ }
+ }
+
+ Ok(())
+ }
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -3621,6 +3682,10 @@ impl Tenant {
Ok(())
}
+
+ pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
+ self.tenant_conf.read().unwrap().tenant_conf.clone()
+ }
}
fn remove_timeline_and_uninit_mark(
diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs
index 6fb8e9613c..dd317df6ea 100644
--- a/pageserver/src/tenant/mgr.rs
+++ b/pageserver/src/tenant/mgr.rs
@@ -2,6 +2,7 @@
//! page server.
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
+use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
@@ -21,7 +22,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use remote_storage::GenericRemoteStorage;
-use utils::crashsafe;
+use utils::{completion, crashsafe};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
@@ -617,8 +618,6 @@ pub(crate) async fn shutdown_all_tenants() {
}
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) {
- use utils::completion;
-
let mut join_set = JoinSet::new();
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
@@ -1113,6 +1112,112 @@ impl TenantManager {
.collect(),
}
}
+
+ #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.0))]
+ pub(crate) async fn shard_split(
+ &self,
+ tenant_shard_id: TenantShardId,
+ new_shard_count: ShardCount,
+ ctx: &RequestContext,
+ ) -> anyhow::Result> {
+ let tenant = get_tenant(tenant_shard_id, true)?;
+
+ // Plan: identify what the new child shards will be
+ let effective_old_shard_count = std::cmp::max(tenant_shard_id.shard_count.0, 1);
+ if new_shard_count <= ShardCount(effective_old_shard_count) {
+ anyhow::bail!("Requested shard count is not an increase");
+ }
+ let expansion_factor = new_shard_count.0 / effective_old_shard_count;
+ if !(expansion_factor & (expansion_factor - 1) == 0) {
+ anyhow::bail!("Requested split is not a power of two");
+ }
+
+ // Key mapping is based on a round robin mapping of key hash modulo shard count,
+ // so our child shards are the ones which the same keys would map to.
+ let mut child_shards = Vec::new();
+ for shard_number in 0..ShardNumber(new_shard_count.0).0 {
+ if shard_number % effective_old_shard_count == tenant_shard_id.shard_number.0 {
+ child_shards.push(TenantShardId {
+ tenant_id: tenant_shard_id.tenant_id,
+ shard_number: ShardNumber(shard_number),
+ shard_count: new_shard_count,
+ })
+ }
+ }
+
+ let parent_shard_identity = tenant.shard_identity;
+ let parent_tenant_conf = tenant.get_tenant_conf();
+ let parent_generation = tenant.generation;
+
+ // TODO: write a unit test for this
+ tracing::info!(
+ "Shard {} splits into: {}",
+ tenant_shard_id.to_index(),
+ child_shards
+ .iter()
+ .map(|id| format!("{}", id.to_index()))
+ .join(",")
+ );
+
+ // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
+ tenant.split_prepare(&child_shards).await?;
+
+ self.resources.deletion_queue_client.flush_advisory();
+
+ // Phase 2: Put the parent shard to InProgress and shut it down
+ drop(tenant);
+ let mut parent_slot_guard =
+ tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
+ match parent_slot_guard.get_old_value() {
+ Some(TenantSlot::Attached(t)) => {
+ let (_guard, progress) = completion::channel();
+ match t.shutdown(progress, false).await {
+ Ok(()) => {}
+ Err(other) => {
+ other.wait().await;
+ }
+ }
+ }
+ Some(TenantSlot::Secondary) => {}
+ Some(TenantSlot::InProgress(_)) => {
+ unreachable!()
+ }
+ None => {
+ // We don't actually need the parent shard to still be attached to do our work, but it's
+ // a weird enough situation that the caller probably didn't want us to continue working
+ // if they had detached the tenant they requested the split on.
+ anyhow::bail!("Detached parent shard in the middle of split!")
+ }
+ };
+ parent_slot_guard.drop_old_value()?;
+
+ // TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download
+ // TODO: erase the dentries from the parent
+
+ // Phase 3: Spawn the child shards
+ for child_shard in &child_shards {
+ let mut child_shard_identity = parent_shard_identity;
+ child_shard_identity.count = child_shard.shard_count;
+ child_shard_identity.number = child_shard.shard_number;
+
+ let child_location_conf = LocationConf {
+ mode: LocationMode::Attached(AttachedLocationConfig {
+ generation: parent_generation,
+ attach_mode: AttachmentMode::Single,
+ }),
+ shard: child_shard_identity,
+ tenant_conf: parent_tenant_conf,
+ };
+
+ self.upsert_location(*child_shard, child_location_conf, None, ctx)
+ .await?;
+ }
+
+ // Phase 4: Release the InProgress on the parent shard
+ drop(parent_slot_guard);
+
+ Ok(child_shards)
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -1999,8 +2104,6 @@ async fn remove_tenant_from_memory(
where
F: std::future::Future