diff --git a/Cargo.lock b/Cargo.lock index 7fd9053f62..45397eb4a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,8 +282,10 @@ dependencies = [ "control_plane", "diesel", "diesel_migrations", + "fail", "futures", "git-version", + "hex", "humantime", "hyper", "metrics", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index a5fad7216c..f78f56c480 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -19,8 +19,10 @@ aws-config.workspace = true aws-sdk-secretsmanager.workspace = true camino.workspace = true clap.workspace = true +fail.workspace = true futures.workspace = true git-version.workspace = true +hex.workspace = true hyper.workspace = true humantime.workspace = true once_cell.workspace = true diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs index d5c6d74ebe..ed6a4980d2 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/control_plane/attachment_service/src/persistence.rs @@ -11,6 +11,9 @@ use diesel::prelude::*; use diesel::Connection; use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy}; use pageserver_api::models::TenantConfig; +use pageserver_api::shard::ShardConfigError; +use pageserver_api::shard::ShardIdentity; +use pageserver_api::shard::ShardStripeSize; use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId}; use serde::{Deserialize, Serialize}; use utils::generation::Generation; @@ -72,6 +75,14 @@ pub(crate) enum DatabaseError { Logical(String), } +#[must_use] +pub(crate) enum AbortShardSplitStatus { + /// We aborted the split in the database by reverting to the parent shards + Aborted, + /// The split had already been persisted. + Complete, +} + pub(crate) type DatabaseResult = Result; impl Persistence { @@ -570,6 +581,42 @@ impl Persistence { }) .await } + + /// Used when the remote part of a shard split failed: we will revert the database state to have only + /// the parent shards, with SplitState::Idle. + pub(crate) async fn abort_shard_split( + &self, + split_tenant_id: TenantId, + new_shard_count: ShardCount, + ) -> DatabaseResult { + use crate::schema::tenant_shards::dsl::*; + self.with_conn(move |conn| -> DatabaseResult { + let aborted = conn.transaction(|conn| -> QueryResult { + // Clear the splitting state on parent shards + let updated = diesel::update(tenant_shards) + .filter(tenant_id.eq(split_tenant_id.to_string())) + .filter(shard_count.ne(new_shard_count.literal() as i32)) + .set((splitting.eq(0),)) + .execute(conn)?; + + // Parent shards are already gone: we cannot abort. + if updated == 0 { + return Ok(AbortShardSplitStatus::Complete); + } + + // Erase child shards + diesel::delete(tenant_shards) + .filter(tenant_id.eq(split_tenant_id.to_string())) + .filter(shard_count.eq(new_shard_count.literal() as i32)) + .execute(conn)?; + + Ok(AbortShardSplitStatus::Aborted) + })?; + + Ok(aborted) + }) + .await + } } /// Parts of [`crate::tenant_state::TenantState`] that are stored durably @@ -604,6 +651,28 @@ pub(crate) struct TenantShardPersistence { pub(crate) config: String, } +impl TenantShardPersistence { + pub(crate) fn get_shard_identity(&self) -> Result { + if self.shard_count == 0 { + Ok(ShardIdentity::unsharded()) + } else { + Ok(ShardIdentity::new( + ShardNumber(self.shard_number as u8), + ShardCount::new(self.shard_count as u8), + ShardStripeSize(self.shard_stripe_size as u32), + )?) + } + } + + pub(crate) fn get_tenant_shard_id(&self) -> Result { + Ok(TenantShardId { + tenant_id: TenantId::from_str(self.tenant_id.as_str())?, + shard_number: ShardNumber(self.shard_number as u8), + shard_count: ShardCount::new(self.shard_count as u8), + }) + } +} + /// Parts of [`crate::node::Node`] that are stored durably #[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)] #[diesel(table_name = crate::schema::nodes)] diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 1c9e5ae511..828ec460fd 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -2,12 +2,11 @@ use std::{ borrow::Cow, cmp::Ordering, collections::{BTreeMap, HashMap, HashSet}, - str::FromStr, sync::Arc, time::{Duration, Instant}, }; -use crate::id_lock_map::IdLockMap; +use crate::{id_lock_map::IdLockMap, persistence::AbortShardSplitStatus}; use anyhow::Context; use control_plane::attachment_service::{ AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, @@ -88,12 +87,16 @@ struct ServiceState { compute_hook: Arc, result_tx: tokio::sync::mpsc::UnboundedSender, + + // Channel for background cleanup from failed operations that require cleanup, such as shard split + abort_tx: tokio::sync::mpsc::UnboundedSender, } impl ServiceState { fn new( config: Config, result_tx: tokio::sync::mpsc::UnboundedSender, + abort_tx: tokio::sync::mpsc::UnboundedSender, nodes: HashMap, tenants: BTreeMap, scheduler: Scheduler, @@ -104,6 +107,7 @@ impl ServiceState { scheduler, compute_hook: Arc::new(ComputeHook::new(config)), result_tx, + abort_tx, } } @@ -186,6 +190,26 @@ enum TenantCreateOrUpdate { Update(Vec), } +/// When we tenant shard split operation fails, we may not be able to clean up immediately, because nodes +/// might not be available. We therefore use a queue of abort operations processed in the background. +struct TenantShardSplitAbort { + tenant_id: TenantId, + /// The target shard count in the request that failed + new_shard_count: ShardCount, + /// Until this abort op is complete, no other operations may be done on the tenant + _tenant_lock: tokio::sync::OwnedRwLockWriteGuard<()>, +} + +#[derive(thiserror::Error, Debug)] +enum TenantShardSplitAbortError { + #[error(transparent)] + Database(#[from] DatabaseError), + #[error(transparent)] + Remote(#[from] mgmt_api::Error), + #[error("Unavailable")] + Unavailable, +} + struct ShardUpdate { tenant_shard_id: TenantShardId, placement_policy: PlacementPolicy, @@ -641,8 +665,55 @@ impl Service { } } + async fn process_aborts( + &self, + mut abort_rx: tokio::sync::mpsc::UnboundedReceiver, + ) { + loop { + // Wait for the next result, or for cancellation + let op = tokio::select! { + r = abort_rx.recv() => { + match r { + Some(op) => {op}, + None => {break;} + } + } + _ = self.cancel.cancelled() => { + break; + } + }; + + // Retry until shutdown: we must keep this request object alive until it is properly + // processed, as it holds a lock guard that prevents other operations trying to do things + // to the tenant while it is in a weird part-split state. + // + // TODO: if a node goes Offline, we should make abort_tenant_shard_split return Ok, and then + // have some logic that insists on a full reconciliation with a node when it goes back to Active. + while !self.cancel.is_cancelled() { + match self.abort_tenant_shard_split(&op).await { + Ok(_) => break, + Err(e) => { + tracing::warn!( + "Failed to abort shard split on {}, will retry: {e}", + op.tenant_id + ); + + // If a node is unavailable, we hope that it has been properly marked Offline + // when we retry, so that the abort op will succeed. If the abort op is failing + // for some other reason, we will keep retrying forever, or until a human notices + // and does something about it (either fixing a pageserver or restarting the controller). + tokio::time::timeout(Duration::from_secs(5), self.cancel.cancelled()) + .await + .ok(); + } + } + } + } + } + pub async fn spawn(config: Config, persistence: Arc) -> anyhow::Result> { let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel(); + let (abort_tx, abort_rx) = tokio::sync::mpsc::unbounded_channel(); tracing::info!("Loading nodes from database..."); let nodes = persistence @@ -661,6 +732,34 @@ impl Service { tenant_shard_persistence.len() ); + // If any shard splits were in progress, reset the database state to abort them + let mut tenant_shard_count_min_max: HashMap = + HashMap::new(); + for tsp in &tenant_shard_persistence { + let shard = tsp.get_shard_identity()?; + let tenant_shard_id = tsp.get_tenant_shard_id()?; + let entry = tenant_shard_count_min_max + .entry(tenant_shard_id.tenant_id) + .or_insert_with(|| (shard.count, shard.count)); + entry.0 = std::cmp::min(entry.0, shard.count); + entry.1 = std::cmp::max(entry.1, shard.count); + } + + for (tenant_id, (count_min, count_max)) in tenant_shard_count_min_max { + if count_min != count_max { + // Aborting the split in the database and dropping the child shards is sufficient: the reconciliation in + // [`Self::startup_reconcile`] will implicitly drop the child shards on remote pageservers. + // TODO: (... IF those remote pageservers are available at time of startup reconcile. Otherwise we + // need to add a mechanism that does a reconcile on nodes before marking them Active.) + tracing::info!("Aborting shard split {tenant_id} {count_min:?} -> {count_max:?}"); + let abort_status = persistence.abort_shard_split(tenant_id, count_max).await?; + + // We may never see the Complete status here: if the split was complete, we wouldn't have + // identified this tenant has having mismatching min/max counts. + assert!(matches!(abort_status, AbortShardSplitStatus::Aborted)); + } + } + let mut tenants = BTreeMap::new(); let mut scheduler = Scheduler::new(nodes.values()); @@ -690,21 +789,8 @@ impl Service { } } for tsp in tenant_shard_persistence { - let tenant_shard_id = TenantShardId { - tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?, - shard_number: ShardNumber(tsp.shard_number as u8), - shard_count: ShardCount::new(tsp.shard_count as u8), - }; - let shard_identity = if tsp.shard_count == 0 { - ShardIdentity::unsharded() - } else { - ShardIdentity::new( - ShardNumber(tsp.shard_number as u8), - ShardCount::new(tsp.shard_count as u8), - ShardStripeSize(tsp.shard_stripe_size as u32), - )? - }; - + let tenant_shard_id = tsp.get_tenant_shard_id()?; + let shard_identity = tsp.get_shard_identity()?; // We will populate intent properly later in [`Self::startup_reconcile`], initially populate // it with what we can infer: the node for which a generation was most recently issued. let mut intent = IntentState::new(); @@ -738,6 +824,7 @@ impl Service { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( config.clone(), result_tx, + abort_tx, nodes, tenants, scheduler, @@ -758,6 +845,16 @@ impl Service { } }); + tokio::task::spawn({ + let this = this.clone(); + async move { + // Block shutdown until we're done (we must respect self.cancel) + if let Ok(_gate) = this.gate.enter() { + this.process_aborts(abort_rx).await + } + } + }); + tokio::task::spawn({ let this = this.clone(); async move { @@ -2187,10 +2284,308 @@ impl Service { }) } + #[instrument(skip_all, fields(tenant_id=%op.tenant_id))] + async fn abort_tenant_shard_split( + &self, + op: &TenantShardSplitAbort, + ) -> Result<(), TenantShardSplitAbortError> { + // Cleaning up a split: + // - Parent shards are not destroyed during a split, just detached. + // - Failed pageserver split API calls can leave the remote node with just the parent attached, + // just the children attached, or both. + // + // Therefore our work to do is to: + // 1. Clean up storage controller's internal state to just refer to parents, no children + // 2. Call out to pageservers to ensure that children are detached + // 3. Call out to pageservers to ensure that parents are attached. + // + // Crash safety: + // - If the storage controller stops running during this cleanup *after* clearing the splitting state + // from our database, then [`Self::startup_reconcile`] will regard child attachments as garbage + // and detach them. + // - TODO: If the storage controller stops running during this cleanup *before* clearing the splitting state + // from our database, then we will re-enter this cleanup routine on startup. + + let TenantShardSplitAbort { + tenant_id, + new_shard_count, + .. + } = op; + + // First abort persistent state, if any exists. + match self + .persistence + .abort_shard_split(*tenant_id, *new_shard_count) + .await? + { + AbortShardSplitStatus::Aborted => { + // Proceed to roll back any child shards created on pageservers + } + AbortShardSplitStatus::Complete => { + // The split completed (we might hit that path if e.g. our database transaction + // to write the completion landed in the database, but we dropped connection + // before seeing the result). + // + // We must update in-memory state to reflect the successful split. + self.tenant_shard_split_commit_inmem(*tenant_id, *new_shard_count); + return Ok(()); + } + } + + // Clean up in-memory state, and accumulate the list of child locations that need detaching + let detach_locations: Vec<(Node, TenantShardId)> = { + let mut detach_locations = Vec::new(); + let mut locked = self.inner.write().unwrap(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, _scheduler) = locked.parts_mut(); + + for (tenant_shard_id, shard) in + tenants.range_mut(TenantShardId::tenant_range(op.tenant_id)) + { + if shard.shard.count == op.new_shard_count { + // Surprising: the phase of [`Self::do_tenant_shard_split`] which inserts child shards in-memory + // is infallible, so if we got an error we shouldn't have got that far. + tracing::warn!( + "During split abort, child shard {tenant_shard_id} found in-memory" + ); + continue; + } + + // Add the children of this shard to this list of things to detach + if let Some(node_id) = shard.intent.get_attached() { + for child_id in tenant_shard_id.split(*new_shard_count) { + detach_locations.push(( + nodes + .get(node_id) + .expect("Intent references nonexistent node") + .clone(), + child_id, + )); + } + } else { + tracing::warn!( + "During split abort, shard {tenant_shard_id} has no attached location" + ); + } + + tracing::info!("Restoring parent shard {tenant_shard_id}"); + shard.splitting = SplitState::Idle; + shard.maybe_reconcile( + result_tx.clone(), + nodes, + &compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ); + } + + // We don't expect any new_shard_count shards to exist here, but drop them just in case + tenants.retain(|_id, s| s.shard.count != *new_shard_count); + + detach_locations + }; + + for (node, child_id) in detach_locations { + if !node.is_available() { + // An unavailable node cannot be cleaned up now: to avoid blocking forever, we will permit this, and + // rely on the reconciliation that happens when a node transitions to Active to clean up. Since we have + // removed child shards from our in-memory state and database, the reconciliation will implicitly remove + // them from the node. + tracing::warn!("Node {node} unavailable, can't clean up during split abort. It will be cleaned up when it is reactivated."); + continue; + } + + // Detach the remote child. If the pageserver split API call is still in progress, this call will get + // a 503 and retry, up to our limit. + tracing::info!("Detaching {child_id} on {node}..."); + match node + .with_client_retries( + |client| async move { + let config = LocationConfig { + mode: LocationConfigMode::Detached, + generation: None, + secondary_conf: None, + shard_number: child_id.shard_number.0, + shard_count: child_id.shard_count.literal(), + // Stripe size and tenant config don't matter when detaching + shard_stripe_size: 0, + tenant_conf: TenantConfig::default(), + }; + + client.location_config(child_id, config, None, false).await + }, + &self.config.jwt_token, + 1, + 10, + Duration::from_secs(5), + &self.cancel, + ) + .await + { + Some(Ok(_)) => {} + Some(Err(e)) => { + // We failed to communicate with the remote node. This is problematic: we may be + // leaving it with a rogue child shard. + tracing::warn!( + "Failed to detach child {child_id} from node {node} during abort" + ); + return Err(e.into()); + } + None => { + // Cancellation: we were shutdown or the node went offline. Shutdown is fine, we'll + // clean up on restart. The node going offline requires a retry. + return Err(TenantShardSplitAbortError::Unavailable); + } + }; + } + + tracing::info!("Successfully aborted split"); + Ok(()) + } + + /// Infallible final stage of [`Self::tenant_shard_split`]: update the contents + /// of the tenant map to reflect the child shards that exist after the split. + fn tenant_shard_split_commit_inmem( + &self, + tenant_id: TenantId, + new_shard_count: ShardCount, + ) -> ( + TenantShardSplitResponse, + Vec<(TenantShardId, NodeId, ShardStripeSize)>, + ) { + let mut response = TenantShardSplitResponse { + new_shards: Vec::new(), + }; + let mut child_locations = Vec::new(); + { + let mut locked = self.inner.write().unwrap(); + + let parent_ids = locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .map(|(shard_id, _)| *shard_id) + .collect::>(); + + let (_nodes, tenants, scheduler) = locked.parts_mut(); + for parent_id in parent_ids { + let child_ids = parent_id.split(new_shard_count); + + let (pageserver, generation, policy, parent_ident, config) = { + let mut old_state = tenants + .remove(&parent_id) + .expect("It was present, we just split it"); + + // A non-splitting state is impossible, because [`Self::tenant_shard_split`] holds + // a TenantId lock and passes it through to [`TenantShardSplitAbort`] in case of cleanup: + // nothing else can clear this. + assert!(matches!(old_state.splitting, SplitState::Splitting)); + + let old_attached = old_state.intent.get_attached().unwrap(); + old_state.intent.clear(scheduler); + let generation = old_state.generation.expect("Shard must have been attached"); + ( + old_attached, + generation, + old_state.policy, + old_state.shard, + old_state.config, + ) + }; + + for child in child_ids { + let mut child_shard = parent_ident; + child_shard.number = child.shard_number; + child_shard.count = child.shard_count; + + let mut child_observed: HashMap = HashMap::new(); + child_observed.insert( + pageserver, + ObservedStateLocation { + conf: Some(attached_location_conf( + generation, + &child_shard, + &config, + matches!(policy, PlacementPolicy::Double(n) if n > 0), + )), + }, + ); + + let mut child_state = TenantState::new(child, child_shard, policy.clone()); + child_state.intent = IntentState::single(scheduler, Some(pageserver)); + child_state.observed = ObservedState { + locations: child_observed, + }; + child_state.generation = Some(generation); + child_state.config = config.clone(); + + // The child's TenantState::splitting is intentionally left at the default value of Idle, + // as at this point in the split process we have succeeded and this part is infallible: + // we will never need to do any special recovery from this state. + + child_locations.push((child, pageserver, child_shard.stripe_size)); + + if let Err(e) = child_state.schedule(scheduler) { + // This is not fatal, because we've implicitly already got an attached + // location for the child shard. Failure here just means we couldn't + // find a secondary (e.g. because cluster is overloaded). + tracing::warn!("Failed to schedule child shard {child}: {e}"); + } + + tenants.insert(child, child_state); + response.new_shards.push(child); + } + } + + (response, child_locations) + } + } + pub(crate) async fn tenant_shard_split( &self, tenant_id: TenantId, split_req: TenantShardSplitRequest, + ) -> Result { + // TODO: return immediately if we can't get exclusive lock? Nicer than timing out. If we can't get it, it's likely + // because of some bogus attempt to retry a previously timed out split, or to split something that is deleting, etc. + let _tenant_lock = self.tenant_locks.exclusive(tenant_id).await; + + let new_shard_count = ShardCount::new(split_req.new_shard_count); + + let r = self.do_tenant_shard_split(tenant_id, split_req).await; + + match r { + Ok(r) => Ok(r), + Err(ApiError::BadRequest(_)) => { + // A request validation error does not require rollback: we rejected it before we started making any changes: just + // return the error + r + } + Err(e) => { + // General case error handling: split might be part-done, we must do work to abort it. + tracing::warn!("Enqueuing backround abort of split on {tenant_id}"); + self.inner + .write() + .unwrap() + .abort_tx + .send(TenantShardSplitAbort { + tenant_id, + new_shard_count, + _tenant_lock, + }) + // Ignore error sending: that just means we're shutting down: aborts are ephemeral so it's fine to drop it. + .ok(); + Err(e) + } + } + } + + pub(crate) async fn do_tenant_shard_split( + &self, + tenant_id: TenantId, + split_req: TenantShardSplitRequest, ) -> Result { let mut policy = None; let mut shard_ident = None; @@ -2202,6 +2597,10 @@ impl Service { child_ids: Vec, } + fail::fail_point!("shard-split-validation", |_| Err(ApiError::BadRequest( + anyhow::anyhow!("failpoint") + ))); + // Validate input, and calculate which shards we will create let (old_shard_count, targets, compute_hook) = { @@ -2363,6 +2762,9 @@ impl Service { _ => return Err(ApiError::InternalServerError(e.into())), } } + fail::fail_point!("shard-split-post-begin", |_| Err( + ApiError::InternalServerError(anyhow::anyhow!("failpoint")) + )); // Now that I have persisted the splitting state, apply it in-memory. This is infallible, so // callers may assume that if splitting is set in memory, then it was persisted, and if splitting @@ -2372,6 +2774,12 @@ impl Service { for target in &targets { if let Some(parent_shard) = locked.tenants.get_mut(&target.parent_id) { parent_shard.splitting = SplitState::Splitting; + // Put the observed state to None, to reflect that it is indeterminate once we start the + // split operation. + parent_shard + .observed + .locations + .insert(target.node.get_id(), ObservedStateLocation { conf: None }); } } } @@ -2401,6 +2809,10 @@ impl Service { .await .map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?; + fail::fail_point!("shard-split-post-remote", |_| Err(ApiError::Conflict( + "failpoint".to_string() + ))); + tracing::info!( "Split {} into {}", parent_id, @@ -2435,67 +2847,13 @@ impl Service { .complete_shard_split(tenant_id, old_shard_count) .await?; + fail::fail_point!("shard-split-post-complete", |_| Err( + ApiError::InternalServerError(anyhow::anyhow!("failpoint")) + )); + // Replace all the shards we just split with their children: this phase is infallible. - let mut response = TenantShardSplitResponse { - new_shards: Vec::new(), - }; - let mut child_locations = Vec::new(); - { - let mut locked = self.inner.write().unwrap(); - let (_nodes, tenants, scheduler) = locked.parts_mut(); - for target in targets { - let SplitTarget { - parent_id, - node: _node, - child_ids, - } = target; - let (pageserver, generation, config) = { - let mut old_state = tenants - .remove(&parent_id) - .expect("It was present, we just split it"); - let old_attached = old_state.intent.get_attached().unwrap(); - old_state.intent.clear(scheduler); - let generation = old_state.generation.expect("Shard must have been attached"); - (old_attached, generation, old_state.config.clone()) - }; - - for child in child_ids { - let mut child_shard = shard_ident; - child_shard.number = child.shard_number; - child_shard.count = child.shard_count; - - let mut child_observed: HashMap = HashMap::new(); - child_observed.insert( - pageserver, - ObservedStateLocation { - conf: Some(attached_location_conf( - generation, - &child_shard, - &config, - matches!(policy, PlacementPolicy::Double(n) if n > 0), - )), - }, - ); - - let mut child_state = TenantState::new(child, child_shard, policy.clone()); - child_state.intent = IntentState::single(scheduler, Some(pageserver)); - child_state.observed = ObservedState { - locations: child_observed, - }; - child_state.generation = Some(generation); - child_state.config = config.clone(); - - // The child's TenantState::splitting is intentionally left at the default value of Idle, - // as at this point in the split process we have succeeded and this part is infallible: - // we will never need to do any special recovery from this state. - - child_locations.push((child, pageserver, child_shard.stripe_size)); - - tenants.insert(child, child_state); - response.new_shards.push(child); - } - } - } + let (response, child_locations) = self + .tenant_shard_split_commit_inmem(tenant_id, ShardCount::new(split_req.new_shard_count)); // Send compute notifications for all the new shards let mut failed_notifications = Vec::new();