diff --git a/Dockerfile b/Dockerfile index bb926643dc..c37f94b981 100644 --- a/Dockerfile +++ b/Dockerfile @@ -100,6 +100,11 @@ RUN mkdir -p /data/.neon/ && chown -R neon:neon /data/.neon/ \ -c "listen_pg_addr='0.0.0.0:6400'" \ -c "listen_http_addr='0.0.0.0:9898'" +# When running a binary that links with libpq, default to using our most recent postgres version. Binaries +# that want a particular postgres version will select it explicitly: this is just a default. +ENV LD_LIBRARY_PATH /usr/local/v16/lib + + VOLUME ["/data"] USER neon EXPOSE 6400 diff --git a/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql b/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql index 585dbc79a0..2ffdae6287 100644 --- a/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql +++ b/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql @@ -7,6 +7,7 @@ CREATE TABLE tenant_shards ( generation INTEGER NOT NULL, generation_pageserver BIGINT NOT NULL, placement_policy VARCHAR NOT NULL, + splitting SMALLINT NOT NULL, -- config is JSON encoded, opaque to the database. config TEXT NOT NULL ); \ No newline at end of file diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 049e66fddf..38eecaf7ef 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -3,7 +3,8 @@ use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT}; use hyper::{Body, Request, Response}; use hyper::{StatusCode, Uri}; use pageserver_api::models::{ - TenantCreateRequest, TenantLocationConfigRequest, TimelineCreateRequest, + TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest, + TimelineCreateRequest, }; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; @@ -292,6 +293,19 @@ async fn handle_node_configure(mut req: Request) -> Result, json_response(StatusCode::OK, state.service.node_configure(config_req)?) } +async fn handle_tenant_shard_split( + service: Arc, + mut req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let split_req = json_request::(&mut req).await?; + + json_response( + StatusCode::OK, + service.tenant_shard_split(tenant_id, split_req).await?, + ) +} + async fn handle_tenant_shard_migrate( service: Arc, mut req: Request, @@ -391,6 +405,9 @@ pub fn make_router( .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| { tenant_service_handler(r, handle_tenant_shard_migrate) }) + .put("/control/v1/tenant/:tenant_id/shard_split", |r| { + tenant_service_handler(r, handle_tenant_shard_split) + }) // Tenant operations // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity. diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs index db487bcec6..cead540058 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/control_plane/attachment_service/src/persistence.rs @@ -1,7 +1,9 @@ +pub(crate) mod split_state; use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; +use self::split_state::SplitState; use camino::Utf8Path; use camino::Utf8PathBuf; use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy}; @@ -363,19 +365,101 @@ impl Persistence { Ok(()) } - // TODO: when we start shard splitting, we must durably mark the tenant so that - // on restart, we know that we must go through recovery (list shards that exist - // and pick up where we left off and/or revert to parent shards). + // When we start shard splitting, we must durably mark the tenant so that + // on restart, we know that we must go through recovery. + // + // We create the child shards here, so that they will be available for increment_generation calls + // if some pageserver holding a child shard needs to restart before the overall tenant split is complete. #[allow(dead_code)] - pub(crate) async fn begin_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> { - todo!(); + pub(crate) async fn begin_shard_split( + &self, + old_shard_count: ShardCount, + split_tenant_id: TenantId, + parent_to_children: Vec<(TenantShardId, Vec)>, + ) -> DatabaseResult<()> { + use crate::schema::tenant_shards::dsl::*; + self.with_conn(move |conn| -> DatabaseResult<()> { + conn.transaction(|conn| -> DatabaseResult<()> { + // Mark parent shards as splitting + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(split_tenant_id.to_string())) + .filter(shard_count.eq(old_shard_count.0 as i32)) + .set((splitting.eq(1),)) + .execute(conn)?; + if ShardCount(updated.try_into().map_err(|_| DatabaseError::Logical(format!("Overflow existing shard count {} while splitting", updated)))?) != old_shard_count { + // Perhaps a deletion or another split raced with this attempt to split, mutating + // the parent shards that we intend to split. In this case the split request should fail. + return Err(DatabaseError::Logical( + format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {old_shard_count:?})") + )); + } + + // FIXME: spurious clone to sidestep closure move rules + let parent_to_children = parent_to_children.clone(); + + // Insert child shards + for (parent_shard_id, children) in parent_to_children { + let mut parent = crate::schema::tenant_shards::table + .filter(tenant_id.eq(parent_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(parent_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(parent_shard_id.shard_count.0 as i32)) + .load::(conn)?; + let parent = if parent.len() != 1 { + return Err(DatabaseError::Logical(format!( + "Parent shard {parent_shard_id} not found" + ))); + } else { + parent.pop().unwrap() + }; + for mut shard in children { + // Carry the parent's generation into the child + shard.generation = parent.generation; + + debug_assert!(shard.splitting == SplitState::Splitting); + diesel::insert_into(tenant_shards) + .values(shard) + .execute(conn)?; + } + } + + Ok(()) + })?; + + Ok(()) + }) + .await } - // TODO: when we finish shard splitting, we must atomically clean up the old shards + // When we finish shard splitting, we must atomically clean up the old shards // and insert the new shards, and clear the splitting marker. #[allow(dead_code)] - pub(crate) async fn complete_shard_split(&self, _tenant_id: TenantId) -> anyhow::Result<()> { - todo!(); + pub(crate) async fn complete_shard_split( + &self, + split_tenant_id: TenantId, + old_shard_count: ShardCount, + ) -> DatabaseResult<()> { + use crate::schema::tenant_shards::dsl::*; + self.with_conn(move |conn| -> DatabaseResult<()> { + conn.transaction(|conn| -> QueryResult<()> { + // Drop parent shards + diesel::delete(tenant_shards) + .filter(tenant_id.eq(split_tenant_id.to_string())) + .filter(shard_count.eq(old_shard_count.0 as i32)) + .execute(conn)?; + + // Clear sharding flag + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(split_tenant_id.to_string())) + .set((splitting.eq(0),)) + .execute(conn)?; + debug_assert!(updated > 0); + + Ok(()) + })?; + + Ok(()) + }) + .await } } @@ -403,6 +487,8 @@ pub(crate) struct TenantShardPersistence { #[serde(default)] pub(crate) placement_policy: String, #[serde(default)] + pub(crate) splitting: SplitState, + #[serde(default)] pub(crate) config: String, } diff --git a/control_plane/attachment_service/src/persistence/split_state.rs b/control_plane/attachment_service/src/persistence/split_state.rs new file mode 100644 index 0000000000..bce1a75843 --- /dev/null +++ b/control_plane/attachment_service/src/persistence/split_state.rs @@ -0,0 +1,46 @@ +use diesel::pg::{Pg, PgValue}; +use diesel::{ + deserialize::FromSql, deserialize::FromSqlRow, expression::AsExpression, serialize::ToSql, + sql_types::Int2, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, FromSqlRow, AsExpression)] +#[diesel(sql_type = SplitStateSQLRepr)] +#[derive(Deserialize, Serialize)] +pub enum SplitState { + Idle = 0, + Splitting = 1, +} + +impl Default for SplitState { + fn default() -> Self { + Self::Idle + } +} + +type SplitStateSQLRepr = Int2; + +impl ToSql for SplitState { + fn to_sql<'a>( + &'a self, + out: &'a mut diesel::serialize::Output, + ) -> diesel::serialize::Result { + let raw_value: i16 = *self as i16; + let mut new_out = out.reborrow(); + ToSql::::to_sql(&raw_value, &mut new_out) + } +} + +impl FromSql for SplitState { + fn from_sql(pg_value: PgValue) -> diesel::deserialize::Result { + match FromSql::::from_sql(pg_value).map(|v| match v { + 0 => Some(Self::Idle), + 1 => Some(Self::Splitting), + _ => None, + })? { + Some(v) => Ok(v), + None => Err(format!("Invalid SplitState value, was: {:?}", pg_value.as_bytes()).into()), + } + } +} diff --git a/control_plane/attachment_service/src/schema.rs b/control_plane/attachment_service/src/schema.rs index de80fc8f64..db5a957443 100644 --- a/control_plane/attachment_service/src/schema.rs +++ b/control_plane/attachment_service/src/schema.rs @@ -20,6 +20,7 @@ diesel::table! { generation -> Int4, generation_pageserver -> Int8, placement_policy -> Varchar, + splitting -> Int2, config -> Text, } } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 1db1906df8..0ec2b9dc4c 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -1,4 +1,5 @@ use std::{ + cmp::Ordering, collections::{BTreeMap, HashMap}, str::FromStr, sync::Arc, @@ -23,7 +24,7 @@ use pageserver_api::{ models::{ LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest, TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation, - TimelineCreateRequest, TimelineInfo, + TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, }, shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId}, }; @@ -40,7 +41,11 @@ use utils::{ use crate::{ compute_hook::{self, ComputeHook}, node::Node, - persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence}, + persistence::{ + split_state::SplitState, DatabaseError, NodePersistence, Persistence, + TenantShardPersistence, + }, + reconciler::attached_location_conf, scheduler::Scheduler, tenant_state::{ IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError, @@ -476,6 +481,7 @@ impl Service { generation_pageserver: i64::MAX, placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(), config: serde_json::to_string(&TenantConfig::default()).unwrap(), + splitting: SplitState::default(), }; match self.persistence.insert_tenant_shards(vec![tsp]).await { @@ -718,6 +724,7 @@ impl Service { generation_pageserver: i64::MAX, placement_policy: serde_json::to_string(&placement_policy).unwrap(), config: serde_json::to_string(&create_req.config).unwrap(), + splitting: SplitState::default(), }) .collect(); self.persistence @@ -1100,6 +1107,7 @@ impl Service { self.ensure_attached_wait(tenant_id).await?; // TODO: refuse to do this if shard splitting is in progress + // (https://github.com/neondatabase/neon/issues/6676) let targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new(); @@ -1180,6 +1188,7 @@ impl Service { self.ensure_attached_wait(tenant_id).await?; // TODO: refuse to do this if shard splitting is in progress + // (https://github.com/neondatabase/neon/issues/6676) let targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new(); @@ -1352,6 +1361,326 @@ impl Service { }) } + pub(crate) async fn tenant_shard_split( + &self, + tenant_id: TenantId, + split_req: TenantShardSplitRequest, + ) -> Result { + let mut policy = None; + let mut shard_ident = None; + + // TODO: put a cancellation token on Service for clean shutdown + let cancel = CancellationToken::new(); + + // A parent shard which will be split + struct SplitTarget { + parent_id: TenantShardId, + node: Node, + child_ids: Vec, + } + + // Validate input, and calculate which shards we will create + let (old_shard_count, targets, compute_hook) = { + let locked = self.inner.read().unwrap(); + + let pageservers = locked.nodes.clone(); + + let mut targets = Vec::new(); + + // In case this is a retry, count how many already-split shards we found + let mut children_found = Vec::new(); + let mut old_shard_count = None; + + for (tenant_shard_id, shard) in + locked.tenants.range(TenantShardId::tenant_range(tenant_id)) + { + match shard.shard.count.0.cmp(&split_req.new_shard_count) { + Ordering::Equal => { + // Already split this + children_found.push(*tenant_shard_id); + continue; + } + Ordering::Greater => { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Requested count {} but already have shards at count {}", + split_req.new_shard_count, + shard.shard.count.0 + ))); + } + Ordering::Less => { + // Fall through: this shard has lower count than requested, + // is a candidate for splitting. + } + } + + match old_shard_count { + None => old_shard_count = Some(shard.shard.count), + Some(old_shard_count) => { + if old_shard_count != shard.shard.count { + // We may hit this case if a caller asked for two splits to + // different sizes, before the first one is complete. + // e.g. 1->2, 2->4, where the 4 call comes while we have a mixture + // of shard_count=1 and shard_count=2 shards in the map. + return Err(ApiError::Conflict( + "Cannot split, currently mid-split".to_string(), + )); + } + } + } + if policy.is_none() { + policy = Some(shard.policy.clone()); + } + if shard_ident.is_none() { + shard_ident = Some(shard.shard); + } + + if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) { + tracing::info!( + "Tenant shard {} already has shard count {}", + tenant_shard_id, + split_req.new_shard_count + ); + continue; + } + + let node_id = + shard + .intent + .attached + .ok_or(ApiError::BadRequest(anyhow::anyhow!( + "Cannot split a tenant that is not attached" + )))?; + + let node = pageservers + .get(&node_id) + .expect("Pageservers may not be deleted while referenced"); + + // TODO: if any reconciliation is currently in progress for this shard, wait for it. + + targets.push(SplitTarget { + parent_id: *tenant_shard_id, + node: node.clone(), + child_ids: tenant_shard_id.split(ShardCount(split_req.new_shard_count)), + }); + } + + if targets.is_empty() { + if children_found.len() == split_req.new_shard_count as usize { + return Ok(TenantShardSplitResponse { + new_shards: children_found, + }); + } else { + // No shards found to split, and no existing children found: the + // tenant doesn't exist at all. + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant {} not found", tenant_id).into(), + )); + } + } + + (old_shard_count, targets, locked.compute_hook.clone()) + }; + + // unwrap safety: we would have returned above if we didn't find at least one shard to split + let old_shard_count = old_shard_count.unwrap(); + let shard_ident = shard_ident.unwrap(); + let policy = policy.unwrap(); + + // FIXME: we have dropped self.inner lock, and not yet written anything to the database: another + // request could occur here, deleting or mutating the tenant. begin_shard_split checks that the + // parent shards exist as expected, but it would be neater to do the above pre-checks within the + // same database transaction rather than pre-check in-memory and then maybe-fail the database write. + // (https://github.com/neondatabase/neon/issues/6676) + + // Before creating any new child shards in memory or on the pageservers, persist them: this + // enables us to ensure that we will always be able to clean up if something goes wrong. This also + // acts as the protection against two concurrent attempts to split: one of them will get a database + // error trying to insert the child shards. + let mut child_tsps = Vec::new(); + for target in &targets { + let mut this_child_tsps = Vec::new(); + for child in &target.child_ids { + let mut child_shard = shard_ident; + child_shard.number = child.shard_number; + child_shard.count = child.shard_count; + + this_child_tsps.push(TenantShardPersistence { + tenant_id: child.tenant_id.to_string(), + shard_number: child.shard_number.0 as i32, + shard_count: child.shard_count.0 as i32, + shard_stripe_size: shard_ident.stripe_size.0 as i32, + // Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will + // populate the correct generation as part of its transaction, to protect us + // against racing with changes in the state of the parent. + generation: 0, + generation_pageserver: target.node.id.0 as i64, + placement_policy: serde_json::to_string(&policy).unwrap(), + // TODO: get the config out of the map + config: serde_json::to_string(&TenantConfig::default()).unwrap(), + splitting: SplitState::Splitting, + }); + } + + child_tsps.push((target.parent_id, this_child_tsps)); + } + + if let Err(e) = self + .persistence + .begin_shard_split(old_shard_count, tenant_id, child_tsps) + .await + { + match e { + DatabaseError::Query(diesel::result::Error::DatabaseError( + DatabaseErrorKind::UniqueViolation, + _, + )) => { + // Inserting a child shard violated a unique constraint: we raced with another call to + // this function + tracing::warn!("Conflicting attempt to split {tenant_id}: {e}"); + return Err(ApiError::Conflict("Tenant is already splitting".into())); + } + _ => return Err(ApiError::InternalServerError(e.into())), + } + } + + // FIXME: we have now committed the shard split state to the database, so any subsequent + // failure needs to roll it back. We will later wrap this function in logic to roll back + // the split if it fails. + // (https://github.com/neondatabase/neon/issues/6676) + + // TODO: issue split calls concurrently (this only matters once we're splitting + // N>1 shards into M shards -- initially we're usually splitting 1 shard into N). + + for target in &targets { + let SplitTarget { + parent_id, + node, + child_ids, + } = target; + let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); + let response = client + .tenant_shard_split( + *parent_id, + TenantShardSplitRequest { + new_shard_count: split_req.new_shard_count, + }, + ) + .await + .map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?; + + tracing::info!( + "Split {} into {}", + parent_id, + response + .new_shards + .iter() + .map(|s| format!("{:?}", s)) + .collect::>() + .join(",") + ); + + if &response.new_shards != child_ids { + // This should never happen: the pageserver should agree with us on how shard splits work. + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Splitting shard {} resulted in unexpected IDs: {:?} (expected {:?})", + parent_id, + response.new_shards, + child_ids + ))); + } + } + + // TODO: if the pageserver restarted concurrently with our split API call, + // the actual generation of the child shard might differ from the generation + // we expect it to have. In order for our in-database generation to end up + // correct, we should carry the child generation back in the response and apply it here + // in complete_shard_split (and apply the correct generation in memory) + // (or, we can carry generation in the request and reject the request if + // it doesn't match, but that requires more retry logic on this side) + + self.persistence + .complete_shard_split(tenant_id, old_shard_count) + .await?; + + // Replace all the shards we just split with their children + let mut response = TenantShardSplitResponse { + new_shards: Vec::new(), + }; + let mut child_locations = Vec::new(); + { + let mut locked = self.inner.write().unwrap(); + for target in targets { + let SplitTarget { + parent_id, + node: _node, + child_ids, + } = target; + let (pageserver, generation, config) = { + let old_state = locked + .tenants + .remove(&parent_id) + .expect("It was present, we just split it"); + ( + old_state.intent.attached.unwrap(), + old_state.generation, + old_state.config.clone(), + ) + }; + + locked.tenants.remove(&parent_id); + + for child in child_ids { + let mut child_shard = shard_ident; + child_shard.number = child.shard_number; + child_shard.count = child.shard_count; + + let mut child_observed: HashMap = HashMap::new(); + child_observed.insert( + pageserver, + ObservedStateLocation { + conf: Some(attached_location_conf(generation, &child_shard, &config)), + }, + ); + + let mut child_state = TenantState::new(child, child_shard, policy.clone()); + child_state.intent = IntentState::single(Some(pageserver)); + child_state.observed = ObservedState { + locations: child_observed, + }; + child_state.generation = generation; + child_state.config = config.clone(); + + child_locations.push((child, pageserver)); + + locked.tenants.insert(child, child_state); + response.new_shards.push(child); + } + } + } + + // Send compute notifications for all the new shards + let mut failed_notifications = Vec::new(); + for (child_id, child_ps) in child_locations { + if let Err(e) = compute_hook.notify(child_id, child_ps, &cancel).await { + tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})", + child_id, child_ps); + failed_notifications.push(child_id); + } + } + + // If we failed any compute notifications, make a note to retry later. + if !failed_notifications.is_empty() { + let mut locked = self.inner.write().unwrap(); + for failed in failed_notifications { + if let Some(shard) = locked.tenants.get_mut(&failed) { + shard.pending_compute_notification = true; + } + } + } + + Ok(response) + } + pub(crate) async fn tenant_shard_migrate( &self, tenant_shard_id: TenantShardId, diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index a358e1ff7b..c0ab076a55 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -193,6 +193,13 @@ impl IntentState { result } + pub(crate) fn single(node_id: Option) -> Self { + Self { + attached: node_id, + secondary: vec![], + } + } + /// When a node goes offline, we update intents to avoid using it /// as their attached pageserver. /// @@ -286,6 +293,9 @@ impl TenantState { // self.intent refers to pageservers that are offline, and pick other // pageservers if so. + // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not + // change their attach location. + // Build the set of pageservers already in use by this tenant, to avoid scheduling // more work on the same pageservers we're already using. let mut used_pageservers = self.intent.all_pageservers(); diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index a3f832036c..c3e071aa71 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -8,7 +8,10 @@ use diesel::{ use diesel_migrations::{HarnessWithOutput, MigrationHarness}; use hyper::Method; use pageserver_api::{ - models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo}, + models::{ + ShardParameters, TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse, + TimelineCreateRequest, TimelineInfo, + }, shard::TenantShardId, }; use pageserver_client::mgmt_api::ResponseErrorMessageExt; @@ -648,7 +651,7 @@ impl AttachmentService { ) -> anyhow::Result { self.dispatch( Method::PUT, - format!("tenant/{tenant_shard_id}/migrate"), + format!("control/v1/tenant/{tenant_shard_id}/migrate"), Some(TenantShardMigrateRequest { tenant_shard_id, node_id, @@ -657,6 +660,20 @@ impl AttachmentService { .await } + #[instrument(skip(self), fields(%tenant_id, %new_shard_count))] + pub async fn tenant_split( + &self, + tenant_id: TenantId, + new_shard_count: u8, + ) -> anyhow::Result { + self.dispatch( + Method::PUT, + format!("control/v1/tenant/{tenant_id}/shard_split"), + Some(TenantShardSplitRequest { new_shard_count }), + ) + .await + } + #[instrument(skip_all, fields(node_id=%req.node_id))] pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req)) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index e56007dd20..b9af467fdf 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -575,6 +575,26 @@ async fn handle_tenant( println!("{tenant_table}"); println!("{shard_table}"); } + Some(("shard-split", matches)) => { + let tenant_id = get_tenant_id(matches, env)?; + let shard_count: u8 = matches.get_one::("shard-count").cloned().unwrap_or(0); + + let attachment_service = AttachmentService::from_env(env); + let result = attachment_service + .tenant_split(tenant_id, shard_count) + .await?; + println!( + "Split tenant {} into shards {}", + tenant_id, + result + .new_shards + .iter() + .map(|s| format!("{:?}", s)) + .collect::>() + .join(",") + ); + } + Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), } @@ -1524,6 +1544,11 @@ fn cli() -> Command { .subcommand(Command::new("status") .about("Human readable summary of the tenant's shards and attachment locations") .arg(tenant_id_arg.clone())) + .subcommand(Command::new("shard-split") + .about("Increase the number of shards in the tenant") + .arg(tenant_id_arg.clone()) + .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)")) + ) ) .subcommand( Command::new("pageserver") diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index c08cacb822..46324efd43 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -192,6 +192,16 @@ pub struct TimelineCreateRequest { pub pg_version: Option, } +#[derive(Serialize, Deserialize)] +pub struct TenantShardSplitRequest { + pub new_shard_count: u8, +} + +#[derive(Serialize, Deserialize)] +pub struct TenantShardSplitResponse { + pub new_shards: Vec, +} + /// Parameters that apply to all shards in a tenant. Used during tenant creation. #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index e27aad8156..322b6c642e 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -88,12 +88,36 @@ impl TenantShardId { pub fn is_unsharded(&self) -> bool { self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0) } + + /// Convenience for dropping the tenant_id and just getting the ShardIndex: this + /// is useful when logging from code that is already in a span that includes tenant ID, to + /// keep messages reasonably terse. pub fn to_index(&self) -> ShardIndex { ShardIndex { shard_number: self.shard_number, shard_count: self.shard_count, } } + + /// Calculate the children of this TenantShardId when splitting the overall tenant into + /// the given number of shards. + pub fn split(&self, new_shard_count: ShardCount) -> Vec { + let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1); + let mut child_shards = Vec::new(); + for shard_number in 0..ShardNumber(new_shard_count.0).0 { + // Key mapping is based on a round robin mapping of key hash modulo shard count, + // so our child shards are the ones which the same keys would map to. + if shard_number % effective_old_shard_count == self.shard_number.0 { + child_shards.push(TenantShardId { + tenant_id: self.tenant_id, + shard_number: ShardNumber(shard_number), + shard_count: new_shard_count, + }) + } + } + + child_shards + } } /// Formatting helper @@ -793,4 +817,108 @@ mod tests { let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key); assert_eq!(shard, ShardNumber(8)); } + + #[test] + fn shard_id_split() { + let tenant_id = TenantId::generate(); + let parent = TenantShardId::unsharded(tenant_id); + + // Unsharded into 2 + assert_eq!( + parent.split(ShardCount(2)), + vec![ + TenantShardId { + tenant_id, + shard_count: ShardCount(2), + shard_number: ShardNumber(0) + }, + TenantShardId { + tenant_id, + shard_count: ShardCount(2), + shard_number: ShardNumber(1) + } + ] + ); + + // Unsharded into 4 + assert_eq!( + parent.split(ShardCount(4)), + vec![ + TenantShardId { + tenant_id, + shard_count: ShardCount(4), + shard_number: ShardNumber(0) + }, + TenantShardId { + tenant_id, + shard_count: ShardCount(4), + shard_number: ShardNumber(1) + }, + TenantShardId { + tenant_id, + shard_count: ShardCount(4), + shard_number: ShardNumber(2) + }, + TenantShardId { + tenant_id, + shard_count: ShardCount(4), + shard_number: ShardNumber(3) + } + ] + ); + + // count=1 into 2 (check this works the same as unsharded.) + let parent = TenantShardId { + tenant_id, + shard_count: ShardCount(1), + shard_number: ShardNumber(0), + }; + assert_eq!( + parent.split(ShardCount(2)), + vec![ + TenantShardId { + tenant_id, + shard_count: ShardCount(2), + shard_number: ShardNumber(0) + }, + TenantShardId { + tenant_id, + shard_count: ShardCount(2), + shard_number: ShardNumber(1) + } + ] + ); + + // count=2 into count=8 + let parent = TenantShardId { + tenant_id, + shard_count: ShardCount(2), + shard_number: ShardNumber(1), + }; + assert_eq!( + parent.split(ShardCount(8)), + vec![ + TenantShardId { + tenant_id, + shard_count: ShardCount(8), + shard_number: ShardNumber(1) + }, + TenantShardId { + tenant_id, + shard_count: ShardCount(8), + shard_number: ShardNumber(3) + }, + TenantShardId { + tenant_id, + shard_count: ShardCount(8), + shard_number: ShardNumber(5) + }, + TenantShardId { + tenant_id, + shard_count: ShardCount(8), + shard_number: ShardNumber(7) + }, + ] + ); + } } diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 8abe58e1a2..200369df90 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -310,6 +310,22 @@ impl Client { .map_err(Error::ReceiveBody) } + pub async fn tenant_shard_split( + &self, + tenant_shard_id: TenantShardId, + req: TenantShardSplitRequest, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{}/shard_split", + self.mgmt_api_endpoint, tenant_shard_id + ); + self.request(Method::PUT, &uri, req) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } + pub async fn timeline_list( &self, tenant_shard_id: &TenantShardId, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ebcb27fa08..af9a3c7301 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -19,11 +19,14 @@ use pageserver_api::models::ShardParameters; use pageserver_api::models::TenantDetails; use pageserver_api::models::TenantLocationConfigResponse; use pageserver_api::models::TenantShardLocation; +use pageserver_api::models::TenantShardSplitRequest; +use pageserver_api::models::TenantShardSplitResponse; use pageserver_api::models::TenantState; use pageserver_api::models::{ DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest, TenantLoadRequest, TenantLocationConfigRequest, }; +use pageserver_api::shard::ShardCount; use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; use remote_storage::TimeTravelError; @@ -875,7 +878,7 @@ async fn tenant_reset_handler( let state = get_state(&request); state .tenant_manager - .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx) + .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), &ctx) .await .map_err(ApiError::InternalServerError)?; @@ -1104,6 +1107,25 @@ async fn tenant_size_handler( ) } +async fn tenant_shard_split_handler( + mut request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let req: TenantShardSplitRequest = json_request(&mut request).await?; + + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let state = get_state(&request); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + + let new_shards = state + .tenant_manager + .shard_split(tenant_shard_id, ShardCount(req.new_shard_count), &ctx) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, TenantShardSplitResponse { new_shards }) +} + async fn layer_map_info_handler( request: Request, _cancel: CancellationToken, @@ -2063,6 +2085,9 @@ pub fn make_router( .put("/v1/tenant/config", |r| { api_handler(r, update_tenant_config_handler) }) + .put("/v1/tenant/:tenant_shard_id/shard_split", |r| { + api_handler(r, tenant_shard_split_handler) + }) .get("/v1/tenant/:tenant_shard_id/config", |r| { api_handler(r, get_tenant_config_handler) }) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f704f8c0dd..f086f46213 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -53,6 +53,7 @@ use self::metadata::TimelineMetadata; use self::mgr::GetActiveTenantError; use self::mgr::GetTenantError; use self::mgr::TenantsMap; +use self::remote_timeline_client::upload::upload_index_part; use self::remote_timeline_client::RemoteTimelineClient; use self::timeline::uninit::TimelineExclusionError; use self::timeline::uninit::TimelineUninitMark; @@ -2397,6 +2398,67 @@ impl Tenant { pub(crate) fn get_generation(&self) -> Generation { self.generation } + + /// This function partially shuts down the tenant (it shuts down the Timelines) and is fallible, + /// and can leave the tenant in a bad state if it fails. The caller is responsible for + /// resetting this tenant to a valid state if we fail. + pub(crate) async fn split_prepare( + &self, + child_shards: &Vec, + ) -> anyhow::Result<()> { + let timelines = self.timelines.lock().unwrap().clone(); + for timeline in timelines.values() { + let Some(tl_client) = &timeline.remote_client else { + anyhow::bail!("Remote storage is mandatory"); + }; + + let Some(remote_storage) = &self.remote_storage else { + anyhow::bail!("Remote storage is mandatory"); + }; + + // We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels + // to ensure that they do not start a split if currently in the process of doing these. + + // Upload an index from the parent: this is partly to provide freshness for the + // child tenants that will copy it, and partly for general ease-of-debugging: there will + // always be a parent shard index in the same generation as we wrote the child shard index. + tl_client.schedule_index_upload_for_file_changes()?; + tl_client.wait_completion().await?; + + // Shut down the timeline's remote client: this means that the indices we write + // for child shards will not be invalidated by the parent shard deleting layers. + tl_client.shutdown().await?; + + // Download methods can still be used after shutdown, as they don't flow through the remote client's + // queue. In principal the RemoteTimelineClient could provide this without downloading it, but this + // operation is rare, so it's simpler to just download it (and robustly guarantees that the index + // we use here really is the remotely persistent one). + let result = tl_client + .download_index_file(self.cancel.clone()) + .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id)) + .await?; + let index_part = match result { + MaybeDeletedIndexPart::Deleted(_) => { + anyhow::bail!("Timeline deletion happened concurrently with split") + } + MaybeDeletedIndexPart::IndexPart(p) => p, + }; + + for child_shard in child_shards { + upload_index_part( + remote_storage, + child_shard, + &timeline.timeline_id, + self.generation, + &index_part, + &self.cancel, + ) + .await?; + } + } + + Ok(()) + } } /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), @@ -3732,6 +3794,10 @@ impl Tenant { Ok(()) } + + pub(crate) fn get_tenant_conf(&self) -> TenantConfOpt { + self.tenant_conf.read().unwrap().tenant_conf + } } fn remove_timeline_and_uninit_mark( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 5ec910ca3e..9aee39bd35 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2,6 +2,7 @@ //! page server. use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; +use itertools::Itertools; use pageserver_api::key::Key; use pageserver_api::models::ShardParameters; use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId}; @@ -22,7 +23,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use remote_storage::GenericRemoteStorage; -use utils::crashsafe; +use utils::{completion, crashsafe}; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; @@ -644,8 +645,6 @@ pub(crate) async fn shutdown_all_tenants() { } async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { - use utils::completion; - let mut join_set = JoinSet::new(); // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants. @@ -1200,7 +1199,7 @@ impl TenantManager { &self, tenant_shard_id: TenantShardId, drop_cache: bool, - ctx: RequestContext, + ctx: &RequestContext, ) -> anyhow::Result<()> { let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; let Some(old_slot) = slot_guard.get_old_value() else { @@ -1253,7 +1252,7 @@ impl TenantManager { None, self.tenants, SpawnMode::Normal, - &ctx, + ctx, )?; slot_guard.upsert(TenantSlot::Attached(tenant))?; @@ -1375,6 +1374,164 @@ impl TenantManager { slot_guard.revert(); result } + + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.0))] + pub(crate) async fn shard_split( + &self, + tenant_shard_id: TenantShardId, + new_shard_count: ShardCount, + ctx: &RequestContext, + ) -> anyhow::Result> { + let tenant = get_tenant(tenant_shard_id, true)?; + + // Plan: identify what the new child shards will be + let effective_old_shard_count = std::cmp::max(tenant_shard_id.shard_count.0, 1); + if new_shard_count <= ShardCount(effective_old_shard_count) { + anyhow::bail!("Requested shard count is not an increase"); + } + let expansion_factor = new_shard_count.0 / effective_old_shard_count; + if !expansion_factor.is_power_of_two() { + anyhow::bail!("Requested split is not a power of two"); + } + + let parent_shard_identity = tenant.shard_identity; + let parent_tenant_conf = tenant.get_tenant_conf(); + let parent_generation = tenant.generation; + + let child_shards = tenant_shard_id.split(new_shard_count); + tracing::info!( + "Shard {} splits into: {}", + tenant_shard_id.to_index(), + child_shards + .iter() + .map(|id| format!("{}", id.to_index())) + .join(",") + ); + + // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation + if let Err(e) = tenant.split_prepare(&child_shards).await { + // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might + // have been left in a partially-shut-down state. + tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning"); + self.reset_tenant(tenant_shard_id, false, ctx).await?; + return Err(e); + } + + self.resources.deletion_queue_client.flush_advisory(); + + // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant + drop(tenant); + let mut parent_slot_guard = + tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; + let parent = match parent_slot_guard.get_old_value() { + Some(TenantSlot::Attached(t)) => t, + Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"), + Some(TenantSlot::InProgress(_)) => { + // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress + // it would return an error. + unreachable!() + } + None => { + // We don't actually need the parent shard to still be attached to do our work, but it's + // a weird enough situation that the caller probably didn't want us to continue working + // if they had detached the tenant they requested the split on. + anyhow::bail!("Detached parent shard in the middle of split!") + } + }; + + // TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download + // TODO: erase the dentries from the parent + + // Take a snapshot of where the parent's WAL ingest had got to: we will wait for + // child shards to reach this point. + let mut target_lsns = HashMap::new(); + for timeline in parent.timelines.lock().unwrap().clone().values() { + target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn()); + } + + // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources + // and could slow down the children trying to catch up. + + // Phase 3: Spawn the child shards + for child_shard in &child_shards { + let mut child_shard_identity = parent_shard_identity; + child_shard_identity.count = child_shard.shard_count; + child_shard_identity.number = child_shard.shard_number; + + let child_location_conf = LocationConf { + mode: LocationMode::Attached(AttachedLocationConfig { + generation: parent_generation, + attach_mode: AttachmentMode::Single, + }), + shard: child_shard_identity, + tenant_conf: parent_tenant_conf, + }; + + self.upsert_location( + *child_shard, + child_location_conf, + None, + SpawnMode::Normal, + ctx, + ) + .await?; + } + + // Phase 4: wait for child chards WAL ingest to catch up to target LSN + for child_shard_id in &child_shards { + let child_shard = { + let locked = TENANTS.read().unwrap(); + let peek_slot = + tenant_map_peek_slot(&locked, child_shard_id, TenantSlotPeekMode::Read)?; + peek_slot.and_then(|s| s.get_attached()).cloned() + }; + if let Some(t) = child_shard { + let timelines = t.timelines.lock().unwrap().clone(); + for timeline in timelines.values() { + let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else { + continue; + }; + + tracing::info!( + "Waiting for child shard {}/{} to reach target lsn {}...", + child_shard_id, + timeline.timeline_id, + target_lsn + ); + if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await { + // Failure here might mean shutdown, in any case this part is an optimization + // and we shouldn't hold up the split operation. + tracing::warn!( + "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}", + timeline.timeline_id + ); + } else { + tracing::info!( + "Child shard {}/{} reached target lsn {}", + child_shard_id, + timeline.timeline_id, + target_lsn + ); + } + } + } + } + + // Phase 5: Shut down the parent shard. + let (_guard, progress) = completion::channel(); + match parent.shutdown(progress, false).await { + Ok(()) => {} + Err(other) => { + other.wait().await; + } + } + parent_slot_guard.drop_old_value()?; + + // Phase 6: Release the InProgress on the parent shard + drop(parent_slot_guard); + + Ok(child_shards) + } } #[derive(Debug, thiserror::Error)] @@ -2209,8 +2366,6 @@ async fn remove_tenant_from_memory( where F: std::future::Future>, { - use utils::completion; - let mut slot_guard = tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?; diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index e8ba1d3d6e..c17e27b446 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -27,7 +27,7 @@ use super::index::LayerFileMetadata; use tracing::info; /// Serializes and uploads the given index part data to the remote storage. -pub(super) async fn upload_index_part<'a>( +pub(crate) async fn upload_index_part<'a>( storage: &'a GenericRemoteStorage, tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4491655aeb..3d2549a8c3 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4054,7 +4054,7 @@ def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) - def tenant_get_shards( - env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] + env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] = None ) -> list[tuple[TenantShardId, NeonPageserver]]: """ Helper for when you want to talk to one or more pageservers, and the diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index c16bfc2ec6..805eaa34b0 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1,6 +1,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, + tenant_get_shards, ) from fixtures.remote_storage import s3_storage from fixtures.types import TimelineId @@ -82,4 +83,130 @@ def test_sharding_smoke( ) assert timelines == {env.initial_timeline, timeline_b} - # TODO: test timeline deletion and tenant deletion (depends on change in attachment_service) + +def test_sharding_split_smoke( + neon_env_builder: NeonEnvBuilder, +): + """ + Test the basics of shard splitting: + - The API results in more shards than we started with + - The tenant's data remains readable + + """ + + # We will start with 4 shards and split into 8, then migrate all those + # 8 shards onto separate pageservers + shard_count = 4 + split_shard_count = 8 + neon_env_builder.num_pageservers = split_shard_count + + # 1MiB stripes: enable getting some meaningful data distribution without + # writing large quantities of data in this test. The stripe size is given + # in number of 8KiB pages. + stripe_size = 128 + + # Use S3-compatible remote storage so that we can scrub: this test validates + # that the scrubber doesn't barf when it sees a sharded tenant. + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.enable_scrub_on_exit() + + neon_env_builder.preserve_database_files = True + + env = neon_env_builder.init_start( + initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size + ) + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + workload = Workload(env, tenant_id, timeline_id, branch_name="main") + workload.init() + + # Initial data + workload.write_rows(256) + + # Note which pageservers initially hold a shard after tenant creation + pre_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)] + + # For pageservers holding a shard, validate their ingest statistics + # reflect a proper splitting of the WAL. + for pageserver in env.pageservers: + if pageserver.id not in pre_split_pageserver_ids: + continue + + metrics = pageserver.http_client().get_metrics_values( + [ + "pageserver_wal_ingest_records_received_total", + "pageserver_wal_ingest_records_committed_total", + "pageserver_wal_ingest_records_filtered_total", + ] + ) + + log.info(f"Pageserver {pageserver.id} metrics: {metrics}") + + # Not everything received was committed + assert ( + metrics["pageserver_wal_ingest_records_received_total"] + > metrics["pageserver_wal_ingest_records_committed_total"] + ) + + # Something was committed + assert metrics["pageserver_wal_ingest_records_committed_total"] > 0 + + # Counts are self consistent + assert ( + metrics["pageserver_wal_ingest_records_received_total"] + == metrics["pageserver_wal_ingest_records_committed_total"] + + metrics["pageserver_wal_ingest_records_filtered_total"] + ) + + # TODO: validate that shards have different sizes + + workload.validate() + + assert len(pre_split_pageserver_ids) == 4 + + env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count) + + post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)] + # We should have split into 8 shards, on the same 4 pageservers we started on. + assert len(post_split_pageserver_ids) == split_shard_count + assert len(set(post_split_pageserver_ids)) == shard_count + assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids) + + workload.validate() + + workload.churn_rows(256) + + workload.validate() + + # Run GC on all new shards, to check they don't barf or delete anything that breaks reads + # (compaction was already run as part of churn_rows) + all_shards = tenant_get_shards(env, tenant_id) + for tenant_shard_id, pageserver in all_shards: + pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None) + + # Restart all nodes, to check that the newly created shards are durable + for ps in env.pageservers: + ps.restart() + + workload.validate() + + migrate_to_pageserver_ids = list( + set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids) + ) + assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count + + # Migrate shards away from the node where the split happened + for ps_id in pre_split_pageserver_ids: + shards_here = [ + tenant_shard_id + for (tenant_shard_id, pageserver) in all_shards + if pageserver.id == ps_id + ] + assert len(shards_here) == 2 + migrate_shard = shards_here[0] + destination = migrate_to_pageserver_ids.pop() + + log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}") + env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10) + + workload.validate()