From e9f7510abf54167dcda8445a23e6c2e9de3a0053 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 3 Jan 2024 14:53:37 +0000 Subject: [PATCH] pageserver: implement shard splitting --- pageserver/src/http/routes.rs | 25 +++ pageserver/src/tenant.rs | 66 +++++++ pageserver/src/tenant/mgr.rs | 164 +++++++++++++++++- .../tenant/remote_timeline_client/upload.rs | 2 +- 4 files changed, 251 insertions(+), 6 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 6744e57139..c5f07844b1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -15,10 +15,13 @@ use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; use pageserver_api::models::TenantDetails; +use pageserver_api::models::TenantShardSplitRequest; +use pageserver_api::models::TenantShardSplitResponse; use pageserver_api::models::{ DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest, TenantLoadRequest, TenantLocationConfigRequest, }; +use pageserver_api::shard::ShardCount; use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; use tenant_size_model::{SizeResult, StorageModel}; @@ -987,6 +990,25 @@ async fn tenant_size_handler( ) } +async fn tenant_shard_split_handler( + mut request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let req: TenantShardSplitRequest = json_request(&mut request).await?; + + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let state = get_state(&request); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + + let new_shards = state + .tenant_manager + .shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, TenantShardSplitResponse { new_shards }) +} + async fn layer_map_info_handler( request: Request, _cancel: CancellationToken, @@ -1797,6 +1819,9 @@ pub fn make_router( .put("/v1/tenant/config", |r| { api_handler(r, update_tenant_config_handler) }) + .put("/v1/tenant/:tenant_shard_id/shard_split", |r| { + api_handler(r, tenant_shard_split_handler) + }) .get("/v1/tenant/:tenant_shard_id/config", |r| { api_handler(r, get_tenant_config_handler) }) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e7bed55e70..774ab1b018 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -51,6 +51,7 @@ use self::metadata::TimelineMetadata; use self::mgr::GetActiveTenantError; use self::mgr::GetTenantError; use self::mgr::TenantsMap; +use self::remote_timeline_client::upload::upload_index_part; use self::remote_timeline_client::RemoteTimelineClient; use self::timeline::uninit::TimelineExclusionError; use self::timeline::uninit::TimelineUninitMark; @@ -1531,6 +1532,7 @@ impl Tenant { })?; if active_only && !timeline.is_active() { + tracing::warn!("Timeline {} is not active", timeline.timeline_id); Err(GetTimelineError::NotActive { tenant_id: self.tenant_shard_id.tenant_id, timeline_id, @@ -2304,6 +2306,66 @@ impl Tenant { pub(crate) fn get_generation(&self) -> Generation { self.generation } + + pub(crate) async fn split_prepare( + &self, + child_shards: &Vec, + ) -> anyhow::Result<()> { + let timelines = self.timelines.lock().unwrap().clone(); + for timeline in timelines.values() { + let Some(tl_client) = &timeline.remote_client else { + anyhow::bail!("Remote storage is mandatory"); + }; + + let Some(remote_storage) = &self.remote_storage else { + anyhow::bail!("Remote storage is mandatory"); + }; + + // TODO: some higher level should enforce that timeline creation/deletion does not + // happen concurrently with splits. This is impossible to safely coordinate locally + // within one single pageserver's view of the world. + + // Upload an index from the parent: this is partly to provide freshness for the + // child tenants that will copy it, and partly for general ease-of-debugging: there will + // always be a parent shard index in the same generation as we wrote the child shard index. + tl_client.schedule_index_upload_for_file_changes()?; + tl_client.wait_completion().await?; + + // Shut down the timeline's remote client: this means that the indices we write + // for child shards will not be invalidated by the parent shard deleting layers. + tl_client.shutdown().await?; + + // Download methods can still be used after shutdown, as they don't flow through the remote client's + // queue. + // TODO: create a way for remote timeline client to give us a copy of the last IndexPart it uploaded + // without having to download it again. + // TODO: carry a cancellation token in here + let result = tl_client + .download_index_file(CancellationToken::new()) + .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id)) + .await?; + let index_part = match result { + MaybeDeletedIndexPart::Deleted(_) => { + anyhow::bail!("Timeline deletion happened concurrently with split") + } + MaybeDeletedIndexPart::IndexPart(p) => p, + }; + + for child_shard in child_shards { + upload_index_part( + remote_storage, + child_shard, + &timeline.timeline_id, + self.generation, + &index_part, + &self.cancel, + ) + .await?; + } + } + + Ok(()) + } } /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), @@ -3620,6 +3682,10 @@ impl Tenant { Ok(()) } + + pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt { + self.tenant_conf.read().unwrap().tenant_conf + } } fn remove_timeline_and_uninit_mark( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 61662a36a5..2219266bb5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2,6 +2,7 @@ //! page server. use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; +use itertools::Itertools; use pageserver_api::key::Key; use pageserver_api::models::ShardParameters; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId}; @@ -21,7 +22,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use remote_storage::GenericRemoteStorage; -use utils::crashsafe; +use utils::{completion, crashsafe}; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; @@ -619,8 +620,6 @@ pub(crate) async fn shutdown_all_tenants() { } async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { - use utils::completion; - let mut join_set = JoinSet::new(); // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants. @@ -1180,6 +1179,163 @@ impl TenantManager { slot_guard.revert(); result } + + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.0))] + pub(crate) async fn shard_split( + &self, + tenant_shard_id: TenantShardId, + new_shard_count: ShardCount, + ctx: &RequestContext, + ) -> anyhow::Result> { + let tenant = get_tenant(tenant_shard_id, true)?; + + // Plan: identify what the new child shards will be + let effective_old_shard_count = std::cmp::max(tenant_shard_id.shard_count.0, 1); + if new_shard_count <= ShardCount(effective_old_shard_count) { + anyhow::bail!("Requested shard count is not an increase"); + } + let expansion_factor = new_shard_count.0 / effective_old_shard_count; + if expansion_factor & (expansion_factor - 1) != 0 { + anyhow::bail!("Requested split is not a power of two"); + } + + // Key mapping is based on a round robin mapping of key hash modulo shard count, + // so our child shards are the ones which the same keys would map to. + let mut child_shards = Vec::new(); + for shard_number in 0..ShardNumber(new_shard_count.0).0 { + if shard_number % effective_old_shard_count == tenant_shard_id.shard_number.0 { + child_shards.push(TenantShardId { + tenant_id: tenant_shard_id.tenant_id, + shard_number: ShardNumber(shard_number), + shard_count: new_shard_count, + }) + } + } + + let parent_shard_identity = tenant.shard_identity; + let parent_tenant_conf = tenant.get_tenant_conf(); + let parent_generation = tenant.generation; + + // TODO: write a unit test for this + tracing::info!( + "Shard {} splits into: {}", + tenant_shard_id.to_index(), + child_shards + .iter() + .map(|id| format!("{}", id.to_index())) + .join(",") + ); + + // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation + tenant.split_prepare(&child_shards).await?; + + self.resources.deletion_queue_client.flush_advisory(); + + // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant + drop(tenant); + let mut parent_slot_guard = + tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; + let parent = match parent_slot_guard.get_old_value() { + Some(TenantSlot::Attached(t)) => t, + Some(TenantSlot::Secondary) => anyhow::bail!("Tenant location in secondary mode"), + Some(TenantSlot::InProgress(_)) => { + unreachable!() + } + None => { + // We don't actually need the parent shard to still be attached to do our work, but it's + // a weird enough situation that the caller probably didn't want us to continue working + // if they had detached the tenant they requested the split on. + anyhow::bail!("Detached parent shard in the middle of split!") + } + }; + + // TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download + // TODO: erase the dentries from the parent + + // Take a snapshot of where the parent's WAL ingest had got to: we will wait for + // child shards to reach this point. + let mut target_lsns = HashMap::new(); + for timeline in parent.timelines.lock().unwrap().clone().values() { + target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn()); + } + + // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources + // and could slow down the children trying to catch up. + + // Phase 3: Spawn the child shards + for child_shard in &child_shards { + let mut child_shard_identity = parent_shard_identity; + child_shard_identity.count = child_shard.shard_count; + child_shard_identity.number = child_shard.shard_number; + + let child_location_conf = LocationConf { + mode: LocationMode::Attached(AttachedLocationConfig { + generation: parent_generation, + attach_mode: AttachmentMode::Single, + }), + shard: child_shard_identity, + tenant_conf: parent_tenant_conf, + }; + + self.upsert_location(*child_shard, child_location_conf, None, ctx) + .await?; + } + + // Phase 4: wait for child chards WAL ingest to catch up to target LSN + for child_shard_id in &child_shards { + let child_shard = { + let locked = TENANTS.read().unwrap(); + let peek_slot = + tenant_map_peek_slot(&locked, child_shard_id, TenantSlotPeekMode::Read)?; + peek_slot.and_then(|s| s.get_attached()).cloned() + }; + if let Some(t) = child_shard { + let timelines = t.timelines.lock().unwrap().clone(); + for timeline in timelines.values() { + let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else { + continue; + }; + + tracing::info!( + "Waiting for child shard {}/{} to reach target lsn {}...", + child_shard_id, + timeline.timeline_id, + target_lsn + ); + if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await { + // Failure here might mean shutdown, in any case this part is an optimization + // and we shouldn't hold up the split operation. + tracing::warn!( + "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}", + timeline.timeline_id + ); + } else { + tracing::info!( + "Child shard {}/{} reached target lsn {}", + child_shard_id, + timeline.timeline_id, + target_lsn + ); + } + } + } + } + + // Phase 5: Shut down the parent shard. + let (_guard, progress) = completion::channel(); + match parent.shutdown(progress, false).await { + Ok(()) => {} + Err(other) => { + other.wait().await; + } + } + parent_slot_guard.drop_old_value()?; + + // Phase 6: Release the InProgress on the parent shard + drop(parent_slot_guard); + + Ok(child_shards) + } } #[derive(Debug, thiserror::Error)] @@ -2033,8 +2189,6 @@ async fn remove_tenant_from_memory( where F: std::future::Future>, { - use utils::completion; - let mut slot_guard = tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?; diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 11c6956875..024335c85a 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -25,7 +25,7 @@ use super::index::LayerFileMetadata; use tracing::info; /// Serializes and uploads the given index part data to the remote storage. -pub(super) async fn upload_index_part<'a>( +pub(crate) async fn upload_index_part<'a>( storage: &'a GenericRemoteStorage, tenant_shard_id: &TenantShardId, timeline_id: &TimelineId,