pageserver: implement shard splitting

This commit is contained in:
John Spray
2023-12-08 12:56:44 +00:00
parent 9cd72caabf
commit fc2f9fa3fe
10 changed files with 366 additions and 10 deletions

View File

@@ -3,7 +3,7 @@ use anyhow::anyhow;
use camino::Utf8PathBuf;
use hyper::{Method, StatusCode};
use pageserver_api::{
models::{TenantCreateRequest, TimelineCreateRequest},
models::{TenantCreateRequest, TenantShardSplitRequest, TimelineCreateRequest},
shard::TenantShardId,
};
use postgres_connection::parse_host_port;
@@ -247,6 +247,15 @@ impl AttachmentService {
self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None)
}
#[instrument(skip(self), fields(%tenant_id, %new_shard_count))]
pub fn tenant_split(&self, tenant_id: TenantId, new_shard_count: u8) -> anyhow::Result<()> {
self.dispatch::<_, ()>(
Method::POST,
format!("tenant/{tenant_id}/shard_split"),
Some(TenantShardSplitRequest { new_shard_count }),
)
}
#[instrument(skip_all, fields(node_id=%req.node_id))]
pub fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req))

View File

@@ -10,7 +10,8 @@ use hyper::{Body, Request, Response};
use hyper::{Method, StatusCode};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, TenantConfig, TenantCreateRequest,
TenantLocationConfigRequest, TimelineCreateRequest,
TenantLocationConfigRequest, TenantShardSplitRequest, TenantShardSplitResponse,
TimelineCreateRequest,
};
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
use reqwest::Client;
@@ -722,6 +723,121 @@ async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, ())
}
async fn handle_tenant_shard_split(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let split_req = json_request::<TenantShardSplitRequest>(&mut req).await?;
let state = get_state(&req).inner.clone();
let mut locked = state.write().await;
let pageservers = locked.pageservers.clone();
let mut replacements = HashMap::new();
for (tenant_shard_id, shard) in locked
.tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
{
if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) {
tracing::warn!(
"Tenant shard {} already has shard count {}",
tenant_shard_id,
split_req.new_shard_count
);
continue;
}
let node_id = shard
.pageserver
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
"Cannot split a tenant that is not attached"
)))?;
let node = pageservers
.get(&node_id)
.expect("Pageservers may not be deleted while referenced");
let client = Client::new();
let response = client
.request(
Method::POST,
format!("{}/tenant/{}/shard_split", node.base_url(), tenant_shard_id),
)
.json(&TenantShardSplitRequest {
new_shard_count: split_req.new_shard_count,
})
.send()
.await
.map_err(|e| {
ApiError::Conflict(format!("Failed to split {}: {}", tenant_shard_id, e))
})?;
// response.error_for_status().map_err(|e| {
// ApiError::Conflict(format!("Failed to split {}: {}", tenant_shard_id, e))
// })?;
let response: TenantShardSplitResponse = response.json().await.map_err(|e| {
ApiError::InternalServerError(anyhow::anyhow!(
"Malformed response from pageserver: {}",
e
))
})?;
replacements.insert(*tenant_shard_id, response.new_shards);
}
// Replace all the shards we just split with their children
for (replaced, children) in replacements.into_iter() {
let (pageserver, generation, shard_ident, config) = {
let old_state = locked
.tenants
.remove(&replaced)
.expect("It was present, we just split it");
(
old_state.pageserver.unwrap(),
old_state.generation,
old_state.shard,
old_state.config.clone(),
)
};
for child in children {
let mut child_shard = shard_ident;
child_shard.number = child.shard_number;
child_shard.count = child.shard_count;
let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
child_observed.insert(
pageserver,
ObservedStateLocation {
conf: Some(LocationConfig {
mode: LocationConfigMode::AttachedSingle,
generation: Some(generation),
secondary_conf: None,
shard_number: child.shard_number.0,
shard_count: child.shard_count.0,
shard_stripe_size: shard_ident.stripe_size.0,
tenant_conf: config.clone(),
}),
},
);
locked.tenants.insert(
child,
TenantState {
tenant_shard_id: child,
shard: child_shard,
pageserver: Some(pageserver),
generation: generation,
observed: ObservedState {
locations: child_observed,
},
config: config.clone(),
},
);
}
}
json_response(StatusCode::OK, ())
}
/// Status endpoint is just used for checking that our HTTP listener is up
async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, ())
@@ -743,6 +859,9 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body,
.get("/tenant/:tenant_id/locate", |r| {
request_span(r, handle_tenant_locate)
})
.put("/tenant/:tenant_id/shard_split", |r| {
request_span(r, handle_tenant_shard_split)
})
}
#[tokio::main]

View File

@@ -515,6 +515,18 @@ async fn handle_tenant(
migrate_tenant(env, tenant_shard_id, new_pageserver).await?;
println!("tenant {tenant_shard_id} migrated to {}", new_pageserver_id);
}
Some(("split", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;
let attachment_service = AttachmentService::from_env(env);
let old_shards = attachment_service.tenant_locate(tenant_id)?.shards;
let new_shard_count = old_shards.len() * 2;
if old_shards.len() > 127 {
bail!("Cannot split further");
}
attachment_service.tenant_split(tenant_id, new_shard_count as u8)?;
println!("Split {}->{}", old_shards.len(), new_shard_count);
}
Some(("status", matches)) => {
let tenant_id = get_tenant_id(matches, env)?;

View File

@@ -191,6 +191,16 @@ pub struct TimelineCreateRequest {
pub pg_version: Option<u32>,
}
#[derive(Serialize, Deserialize)]
pub struct TenantShardSplitRequest {
pub new_shard_count: u8,
}
#[derive(Serialize, Deserialize)]
pub struct TenantShardSplitResponse {
pub new_shards: Vec<TenantShardId>,
}
/// Parameters that apply to all shards in a tenant. Used during tenant creation.
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]

View File

@@ -88,6 +88,16 @@ impl TenantShardId {
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
/// is useful when logging from code that is already in a span that includes tenant ID, to
/// keep messages reasonably terse.
pub fn to_index(&self) -> ShardIndex {
ShardIndex {
shard_number: self.shard_number,
shard_count: self.shard_count,
}
}
}
/// Formatting helper

View File

@@ -15,10 +15,13 @@ use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::TenantDetails;
use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use pageserver_api::shard::ShardCount;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
@@ -986,6 +989,25 @@ async fn tenant_size_handler(
)
}
async fn tenant_shard_split_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let req: TenantShardSplitRequest = json_request(&mut request).await?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let new_shards = state
.tenant_manager
.shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, TenantShardSplitResponse { new_shards })
}
async fn layer_map_info_handler(
request: Request<Body>,
_cancel: CancellationToken,
@@ -1824,6 +1846,9 @@ pub fn make_router(
.put("/v1/tenant/config", |r| {
api_handler(r, update_tenant_config_handler)
})
.put("/v1/tenant/:tenant_shard_id/shard_split", |r| {
api_handler(r, tenant_shard_split_handler)
})
.get("/v1/tenant/:tenant_shard_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})

View File

@@ -50,6 +50,7 @@ use self::metadata::TimelineMetadata;
use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::mgr::TenantsMap;
use self::remote_timeline_client::upload::upload_index_part;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::TimelineUninitMark;
@@ -2305,6 +2306,66 @@ impl Tenant {
pub(crate) fn get_generation(&self) -> Generation {
self.generation
}
pub(crate) async fn split_prepare(
&self,
child_shards: &Vec<TenantShardId>,
) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
for timeline in timelines.values() {
let Some(tl_client) = &timeline.remote_client else {
anyhow::bail!("Remote storage is mandatory");
};
let Some(remote_storage) = &self.remote_storage else {
anyhow::bail!("Remote storage is mandatory");
};
// TODO: some higher level should enforce that timeline creation/deletion does not
// happen concurrently with splits. This is impossible to safely coordinate locally
// within one single pageserver's view of the world.
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tl_client.schedule_index_upload_for_file_changes()?;
tl_client.wait_completion().await?;
// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tl_client.shutdown().await?;
// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue.
// TODO: create a way for remote timeline client to give us a copy of the last IndexPart it uploaded
// without having to download it again.
// TODO: carry a cancellation token in here
let result = tl_client
.download_index_file(CancellationToken::new())
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
.await?;
let index_part = match result {
MaybeDeletedIndexPart::Deleted(_) => {
anyhow::bail!("Timeline deletion happened concurrently with split")
}
MaybeDeletedIndexPart::IndexPart(p) => p,
};
for child_shard in child_shards {
upload_index_part(
&remote_storage,
child_shard,
&timeline.timeline_id,
self.generation,
&index_part,
&self.cancel,
)
.await?;
}
}
Ok(())
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -3621,6 +3682,10 @@ impl Tenant {
Ok(())
}
pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf.clone()
}
}
fn remove_timeline_and_uninit_mark(

View File

@@ -2,6 +2,7 @@
//! page server.
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
@@ -21,7 +22,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use remote_storage::GenericRemoteStorage;
use utils::crashsafe;
use utils::{completion, crashsafe};
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
@@ -617,8 +618,6 @@ pub(crate) async fn shutdown_all_tenants() {
}
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
use utils::completion;
let mut join_set = JoinSet::new();
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
@@ -1113,6 +1112,112 @@ impl TenantManager {
.collect(),
}
}
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.0))]
pub(crate) async fn shard_split(
&self,
tenant_shard_id: TenantShardId,
new_shard_count: ShardCount,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant = get_tenant(tenant_shard_id, true)?;
// Plan: identify what the new child shards will be
let effective_old_shard_count = std::cmp::max(tenant_shard_id.shard_count.0, 1);
if new_shard_count <= ShardCount(effective_old_shard_count) {
anyhow::bail!("Requested shard count is not an increase");
}
let expansion_factor = new_shard_count.0 / effective_old_shard_count;
if !(expansion_factor & (expansion_factor - 1) == 0) {
anyhow::bail!("Requested split is not a power of two");
}
// Key mapping is based on a round robin mapping of key hash modulo shard count,
// so our child shards are the ones which the same keys would map to.
let mut child_shards = Vec::new();
for shard_number in 0..ShardNumber(new_shard_count.0).0 {
if shard_number % effective_old_shard_count == tenant_shard_id.shard_number.0 {
child_shards.push(TenantShardId {
tenant_id: tenant_shard_id.tenant_id,
shard_number: ShardNumber(shard_number),
shard_count: new_shard_count,
})
}
}
let parent_shard_identity = tenant.shard_identity;
let parent_tenant_conf = tenant.get_tenant_conf();
let parent_generation = tenant.generation;
// TODO: write a unit test for this
tracing::info!(
"Shard {} splits into: {}",
tenant_shard_id.to_index(),
child_shards
.iter()
.map(|id| format!("{}", id.to_index()))
.join(",")
);
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
tenant.split_prepare(&child_shards).await?;
self.resources.deletion_queue_client.flush_advisory();
// Phase 2: Put the parent shard to InProgress and shut it down
drop(tenant);
let mut parent_slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
match parent_slot_guard.get_old_value() {
Some(TenantSlot::Attached(t)) => {
let (_guard, progress) = completion::channel();
match t.shutdown(progress, false).await {
Ok(()) => {}
Err(other) => {
other.wait().await;
}
}
}
Some(TenantSlot::Secondary) => {}
Some(TenantSlot::InProgress(_)) => {
unreachable!()
}
None => {
// We don't actually need the parent shard to still be attached to do our work, but it's
// a weird enough situation that the caller probably didn't want us to continue working
// if they had detached the tenant they requested the split on.
anyhow::bail!("Detached parent shard in the middle of split!")
}
};
parent_slot_guard.drop_old_value()?;
// TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download
// TODO: erase the dentries from the parent
// Phase 3: Spawn the child shards
for child_shard in &child_shards {
let mut child_shard_identity = parent_shard_identity;
child_shard_identity.count = child_shard.shard_count;
child_shard_identity.number = child_shard.shard_number;
let child_location_conf = LocationConf {
mode: LocationMode::Attached(AttachedLocationConfig {
generation: parent_generation,
attach_mode: AttachmentMode::Single,
}),
shard: child_shard_identity,
tenant_conf: parent_tenant_conf,
};
self.upsert_location(*child_shard, child_location_conf, None, ctx)
.await?;
}
// Phase 4: Release the InProgress on the parent shard
drop(parent_slot_guard);
Ok(child_shards)
}
}
#[derive(Debug, thiserror::Error)]
@@ -1999,8 +2104,6 @@ async fn remove_tenant_from_memory<V, F>(
where
F: std::future::Future<Output = anyhow::Result<V>>,
{
use utils::completion;
let mut slot_guard =
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;

View File

@@ -182,7 +182,7 @@
pub(crate) mod download;
pub mod index;
mod upload;
pub(crate) mod upload;
use anyhow::Context;
use camino::Utf8Path;
@@ -690,7 +690,10 @@ impl RemoteTimelineClient {
.insert(layer.layer_desc().filename(), metadata.clone());
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
info!("scheduled layer file upload {layer}");
info!(
"scheduled layer file upload {layer} gen={:?} shard={:?}",
metadata.generation, metadata.shard
);
let op = UploadOp::UploadLayer(layer, metadata);
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);

View File

@@ -25,7 +25,7 @@ use super::index::LayerFileMetadata;
use tracing::info;
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<'a>(
pub(crate) async fn upload_index_part<'a>(
storage: &'a GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,