storage controller: error handling for shard split

This commit is contained in:
John Spray
2024-03-07 16:59:33 +00:00
parent acd397a5f4
commit 9bdbe2d630
4 changed files with 508 additions and 77 deletions

2
Cargo.lock generated
View File

@@ -282,8 +282,10 @@ dependencies = [
"control_plane",
"diesel",
"diesel_migrations",
"fail",
"futures",
"git-version",
"hex",
"humantime",
"hyper",
"metrics",

View File

@@ -19,8 +19,10 @@ aws-config.workspace = true
aws-sdk-secretsmanager.workspace = true
camino.workspace = true
clap.workspace = true
fail.workspace = true
futures.workspace = true
git-version.workspace = true
hex.workspace = true
hyper.workspace = true
humantime.workspace = true
once_cell.workspace = true

View File

@@ -11,6 +11,9 @@ use diesel::prelude::*;
use diesel::Connection;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::ShardConfigError;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use serde::{Deserialize, Serialize};
use utils::generation::Generation;
@@ -72,6 +75,14 @@ pub(crate) enum DatabaseError {
Logical(String),
}
#[must_use]
pub(crate) enum AbortShardSplitStatus {
/// We aborted the split in the database by reverting to the parent shards
Aborted,
/// The split had already been persisted.
Complete,
}
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence {
@@ -570,6 +581,42 @@ impl Persistence {
})
.await
}
/// Used when the remote part of a shard split failed: we will revert the database state to have only
/// the parent shards, with SplitState::Idle.
pub(crate) async fn abort_shard_split(
&self,
split_tenant_id: TenantId,
new_shard_count: ShardCount,
) -> DatabaseResult<AbortShardSplitStatus> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
let aborted = conn.transaction(|conn| -> QueryResult<AbortShardSplitStatus> {
// Clear the splitting state on parent shards
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.ne(new_shard_count.literal() as i32))
.set((splitting.eq(0),))
.execute(conn)?;
// Parent shards are already gone: we cannot abort.
if updated == 0 {
return Ok(AbortShardSplitStatus::Complete);
}
// Erase child shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(new_shard_count.literal() as i32))
.execute(conn)?;
Ok(AbortShardSplitStatus::Aborted)
})?;
Ok(aborted)
})
.await
}
}
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
@@ -604,6 +651,28 @@ pub(crate) struct TenantShardPersistence {
pub(crate) config: String,
}
impl TenantShardPersistence {
pub(crate) fn get_shard_identity(&self) -> Result<ShardIdentity, ShardConfigError> {
if self.shard_count == 0 {
Ok(ShardIdentity::unsharded())
} else {
Ok(ShardIdentity::new(
ShardNumber(self.shard_number as u8),
ShardCount::new(self.shard_count as u8),
ShardStripeSize(self.shard_stripe_size as u32),
)?)
}
}
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
})
}
}
/// Parts of [`crate::node::Node`] that are stored durably
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq)]
#[diesel(table_name = crate::schema::nodes)]

View File

@@ -2,12 +2,11 @@ use std::{
borrow::Cow,
cmp::Ordering,
collections::{BTreeMap, HashMap, HashSet},
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use crate::id_lock_map::IdLockMap;
use crate::{id_lock_map::IdLockMap, persistence::AbortShardSplitStatus};
use anyhow::Context;
use control_plane::attachment_service::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
@@ -88,12 +87,16 @@ struct ServiceState {
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
// Channel for background cleanup from failed operations that require cleanup, such as shard split
abort_tx: tokio::sync::mpsc::UnboundedSender<TenantShardSplitAbort>,
}
impl ServiceState {
fn new(
config: Config,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
abort_tx: tokio::sync::mpsc::UnboundedSender<TenantShardSplitAbort>,
nodes: HashMap<NodeId, Node>,
tenants: BTreeMap<TenantShardId, TenantState>,
scheduler: Scheduler,
@@ -104,6 +107,7 @@ impl ServiceState {
scheduler,
compute_hook: Arc::new(ComputeHook::new(config)),
result_tx,
abort_tx,
}
}
@@ -186,6 +190,26 @@ enum TenantCreateOrUpdate {
Update(Vec<ShardUpdate>),
}
/// When we tenant shard split operation fails, we may not be able to clean up immediately, because nodes
/// might not be available. We therefore use a queue of abort operations processed in the background.
struct TenantShardSplitAbort {
tenant_id: TenantId,
/// The target shard count in the request that failed
new_shard_count: ShardCount,
/// Until this abort op is complete, no other operations may be done on the tenant
_tenant_lock: tokio::sync::OwnedRwLockWriteGuard<()>,
}
#[derive(thiserror::Error, Debug)]
enum TenantShardSplitAbortError {
#[error(transparent)]
Database(#[from] DatabaseError),
#[error(transparent)]
Remote(#[from] mgmt_api::Error),
#[error("Unavailable")]
Unavailable,
}
struct ShardUpdate {
tenant_shard_id: TenantShardId,
placement_policy: PlacementPolicy,
@@ -641,8 +665,55 @@ impl Service {
}
}
async fn process_aborts(
&self,
mut abort_rx: tokio::sync::mpsc::UnboundedReceiver<TenantShardSplitAbort>,
) {
loop {
// Wait for the next result, or for cancellation
let op = tokio::select! {
r = abort_rx.recv() => {
match r {
Some(op) => {op},
None => {break;}
}
}
_ = self.cancel.cancelled() => {
break;
}
};
// Retry until shutdown: we must keep this request object alive until it is properly
// processed, as it holds a lock guard that prevents other operations trying to do things
// to the tenant while it is in a weird part-split state.
//
// TODO: if a node goes Offline, we should make abort_tenant_shard_split return Ok, and then
// have some logic that insists on a full reconciliation with a node when it goes back to Active.
while !self.cancel.is_cancelled() {
match self.abort_tenant_shard_split(&op).await {
Ok(_) => break,
Err(e) => {
tracing::warn!(
"Failed to abort shard split on {}, will retry: {e}",
op.tenant_id
);
// If a node is unavailable, we hope that it has been properly marked Offline
// when we retry, so that the abort op will succeed. If the abort op is failing
// for some other reason, we will keep retrying forever, or until a human notices
// and does something about it (either fixing a pageserver or restarting the controller).
tokio::time::timeout(Duration::from_secs(5), self.cancel.cancelled())
.await
.ok();
}
}
}
}
}
pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
let (abort_tx, abort_rx) = tokio::sync::mpsc::unbounded_channel();
tracing::info!("Loading nodes from database...");
let nodes = persistence
@@ -661,6 +732,34 @@ impl Service {
tenant_shard_persistence.len()
);
// If any shard splits were in progress, reset the database state to abort them
let mut tenant_shard_count_min_max: HashMap<TenantId, (ShardCount, ShardCount)> =
HashMap::new();
for tsp in &tenant_shard_persistence {
let shard = tsp.get_shard_identity()?;
let tenant_shard_id = tsp.get_tenant_shard_id()?;
let entry = tenant_shard_count_min_max
.entry(tenant_shard_id.tenant_id)
.or_insert_with(|| (shard.count, shard.count));
entry.0 = std::cmp::min(entry.0, shard.count);
entry.1 = std::cmp::max(entry.1, shard.count);
}
for (tenant_id, (count_min, count_max)) in tenant_shard_count_min_max {
if count_min != count_max {
// Aborting the split in the database and dropping the child shards is sufficient: the reconciliation in
// [`Self::startup_reconcile`] will implicitly drop the child shards on remote pageservers.
// TODO: (... IF those remote pageservers are available at time of startup reconcile. Otherwise we
// need to add a mechanism that does a reconcile on nodes before marking them Active.)
tracing::info!("Aborting shard split {tenant_id} {count_min:?} -> {count_max:?}");
let abort_status = persistence.abort_shard_split(tenant_id, count_max).await?;
// We may never see the Complete status here: if the split was complete, we wouldn't have
// identified this tenant has having mismatching min/max counts.
assert!(matches!(abort_status, AbortShardSplitStatus::Aborted));
}
}
let mut tenants = BTreeMap::new();
let mut scheduler = Scheduler::new(nodes.values());
@@ -690,21 +789,8 @@ impl Service {
}
}
for tsp in tenant_shard_persistence {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
shard_number: ShardNumber(tsp.shard_number as u8),
shard_count: ShardCount::new(tsp.shard_count as u8),
};
let shard_identity = if tsp.shard_count == 0 {
ShardIdentity::unsharded()
} else {
ShardIdentity::new(
ShardNumber(tsp.shard_number as u8),
ShardCount::new(tsp.shard_count as u8),
ShardStripeSize(tsp.shard_stripe_size as u32),
)?
};
let tenant_shard_id = tsp.get_tenant_shard_id()?;
let shard_identity = tsp.get_shard_identity()?;
// We will populate intent properly later in [`Self::startup_reconcile`], initially populate
// it with what we can infer: the node for which a generation was most recently issued.
let mut intent = IntentState::new();
@@ -738,6 +824,7 @@ impl Service {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
config.clone(),
result_tx,
abort_tx,
nodes,
tenants,
scheduler,
@@ -758,6 +845,16 @@ impl Service {
}
});
tokio::task::spawn({
let this = this.clone();
async move {
// Block shutdown until we're done (we must respect self.cancel)
if let Ok(_gate) = this.gate.enter() {
this.process_aborts(abort_rx).await
}
}
});
tokio::task::spawn({
let this = this.clone();
async move {
@@ -2187,10 +2284,308 @@ impl Service {
})
}
#[instrument(skip_all, fields(tenant_id=%op.tenant_id))]
async fn abort_tenant_shard_split(
&self,
op: &TenantShardSplitAbort,
) -> Result<(), TenantShardSplitAbortError> {
// Cleaning up a split:
// - Parent shards are not destroyed during a split, just detached.
// - Failed pageserver split API calls can leave the remote node with just the parent attached,
// just the children attached, or both.
//
// Therefore our work to do is to:
// 1. Clean up storage controller's internal state to just refer to parents, no children
// 2. Call out to pageservers to ensure that children are detached
// 3. Call out to pageservers to ensure that parents are attached.
//
// Crash safety:
// - If the storage controller stops running during this cleanup *after* clearing the splitting state
// from our database, then [`Self::startup_reconcile`] will regard child attachments as garbage
// and detach them.
// - TODO: If the storage controller stops running during this cleanup *before* clearing the splitting state
// from our database, then we will re-enter this cleanup routine on startup.
let TenantShardSplitAbort {
tenant_id,
new_shard_count,
..
} = op;
// First abort persistent state, if any exists.
match self
.persistence
.abort_shard_split(*tenant_id, *new_shard_count)
.await?
{
AbortShardSplitStatus::Aborted => {
// Proceed to roll back any child shards created on pageservers
}
AbortShardSplitStatus::Complete => {
// The split completed (we might hit that path if e.g. our database transaction
// to write the completion landed in the database, but we dropped connection
// before seeing the result).
//
// We must update in-memory state to reflect the successful split.
self.tenant_shard_split_commit_inmem(*tenant_id, *new_shard_count);
return Ok(());
}
}
// Clean up in-memory state, and accumulate the list of child locations that need detaching
let detach_locations: Vec<(Node, TenantShardId)> = {
let mut detach_locations = Vec::new();
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, _scheduler) = locked.parts_mut();
for (tenant_shard_id, shard) in
tenants.range_mut(TenantShardId::tenant_range(op.tenant_id))
{
if shard.shard.count == op.new_shard_count {
// Surprising: the phase of [`Self::do_tenant_shard_split`] which inserts child shards in-memory
// is infallible, so if we got an error we shouldn't have got that far.
tracing::warn!(
"During split abort, child shard {tenant_shard_id} found in-memory"
);
continue;
}
// Add the children of this shard to this list of things to detach
if let Some(node_id) = shard.intent.get_attached() {
for child_id in tenant_shard_id.split(*new_shard_count) {
detach_locations.push((
nodes
.get(node_id)
.expect("Intent references nonexistent node")
.clone(),
child_id,
));
}
} else {
tracing::warn!(
"During split abort, shard {tenant_shard_id} has no attached location"
);
}
tracing::info!("Restoring parent shard {tenant_shard_id}");
shard.splitting = SplitState::Idle;
shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
}
// We don't expect any new_shard_count shards to exist here, but drop them just in case
tenants.retain(|_id, s| s.shard.count != *new_shard_count);
detach_locations
};
for (node, child_id) in detach_locations {
if !node.is_available() {
// An unavailable node cannot be cleaned up now: to avoid blocking forever, we will permit this, and
// rely on the reconciliation that happens when a node transitions to Active to clean up. Since we have
// removed child shards from our in-memory state and database, the reconciliation will implicitly remove
// them from the node.
tracing::warn!("Node {node} unavailable, can't clean up during split abort. It will be cleaned up when it is reactivated.");
continue;
}
// Detach the remote child. If the pageserver split API call is still in progress, this call will get
// a 503 and retry, up to our limit.
tracing::info!("Detaching {child_id} on {node}...");
match node
.with_client_retries(
|client| async move {
let config = LocationConfig {
mode: LocationConfigMode::Detached,
generation: None,
secondary_conf: None,
shard_number: child_id.shard_number.0,
shard_count: child_id.shard_count.literal(),
// Stripe size and tenant config don't matter when detaching
shard_stripe_size: 0,
tenant_conf: TenantConfig::default(),
};
client.location_config(child_id, config, None, false).await
},
&self.config.jwt_token,
1,
10,
Duration::from_secs(5),
&self.cancel,
)
.await
{
Some(Ok(_)) => {}
Some(Err(e)) => {
// We failed to communicate with the remote node. This is problematic: we may be
// leaving it with a rogue child shard.
tracing::warn!(
"Failed to detach child {child_id} from node {node} during abort"
);
return Err(e.into());
}
None => {
// Cancellation: we were shutdown or the node went offline. Shutdown is fine, we'll
// clean up on restart. The node going offline requires a retry.
return Err(TenantShardSplitAbortError::Unavailable);
}
};
}
tracing::info!("Successfully aborted split");
Ok(())
}
/// Infallible final stage of [`Self::tenant_shard_split`]: update the contents
/// of the tenant map to reflect the child shards that exist after the split.
fn tenant_shard_split_commit_inmem(
&self,
tenant_id: TenantId,
new_shard_count: ShardCount,
) -> (
TenantShardSplitResponse,
Vec<(TenantShardId, NodeId, ShardStripeSize)>,
) {
let mut response = TenantShardSplitResponse {
new_shards: Vec::new(),
};
let mut child_locations = Vec::new();
{
let mut locked = self.inner.write().unwrap();
let parent_ids = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.map(|(shard_id, _)| *shard_id)
.collect::<Vec<_>>();
let (_nodes, tenants, scheduler) = locked.parts_mut();
for parent_id in parent_ids {
let child_ids = parent_id.split(new_shard_count);
let (pageserver, generation, policy, parent_ident, config) = {
let mut old_state = tenants
.remove(&parent_id)
.expect("It was present, we just split it");
// A non-splitting state is impossible, because [`Self::tenant_shard_split`] holds
// a TenantId lock and passes it through to [`TenantShardSplitAbort`] in case of cleanup:
// nothing else can clear this.
assert!(matches!(old_state.splitting, SplitState::Splitting));
let old_attached = old_state.intent.get_attached().unwrap();
old_state.intent.clear(scheduler);
let generation = old_state.generation.expect("Shard must have been attached");
(
old_attached,
generation,
old_state.policy,
old_state.shard,
old_state.config,
)
};
for child in child_ids {
let mut child_shard = parent_ident;
child_shard.number = child.shard_number;
child_shard.count = child.shard_count;
let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
child_observed.insert(
pageserver,
ObservedStateLocation {
conf: Some(attached_location_conf(
generation,
&child_shard,
&config,
matches!(policy, PlacementPolicy::Double(n) if n > 0),
)),
},
);
let mut child_state = TenantState::new(child, child_shard, policy.clone());
child_state.intent = IntentState::single(scheduler, Some(pageserver));
child_state.observed = ObservedState {
locations: child_observed,
};
child_state.generation = Some(generation);
child_state.config = config.clone();
// The child's TenantState::splitting is intentionally left at the default value of Idle,
// as at this point in the split process we have succeeded and this part is infallible:
// we will never need to do any special recovery from this state.
child_locations.push((child, pageserver, child_shard.stripe_size));
if let Err(e) = child_state.schedule(scheduler) {
// This is not fatal, because we've implicitly already got an attached
// location for the child shard. Failure here just means we couldn't
// find a secondary (e.g. because cluster is overloaded).
tracing::warn!("Failed to schedule child shard {child}: {e}");
}
tenants.insert(child, child_state);
response.new_shards.push(child);
}
}
(response, child_locations)
}
}
pub(crate) async fn tenant_shard_split(
&self,
tenant_id: TenantId,
split_req: TenantShardSplitRequest,
) -> Result<TenantShardSplitResponse, ApiError> {
// TODO: return immediately if we can't get exclusive lock? Nicer than timing out. If we can't get it, it's likely
// because of some bogus attempt to retry a previously timed out split, or to split something that is deleting, etc.
let _tenant_lock = self.tenant_locks.exclusive(tenant_id).await;
let new_shard_count = ShardCount::new(split_req.new_shard_count);
let r = self.do_tenant_shard_split(tenant_id, split_req).await;
match r {
Ok(r) => Ok(r),
Err(ApiError::BadRequest(_)) => {
// A request validation error does not require rollback: we rejected it before we started making any changes: just
// return the error
r
}
Err(e) => {
// General case error handling: split might be part-done, we must do work to abort it.
tracing::warn!("Enqueuing backround abort of split on {tenant_id}");
self.inner
.write()
.unwrap()
.abort_tx
.send(TenantShardSplitAbort {
tenant_id,
new_shard_count,
_tenant_lock,
})
// Ignore error sending: that just means we're shutting down: aborts are ephemeral so it's fine to drop it.
.ok();
Err(e)
}
}
}
pub(crate) async fn do_tenant_shard_split(
&self,
tenant_id: TenantId,
split_req: TenantShardSplitRequest,
) -> Result<TenantShardSplitResponse, ApiError> {
let mut policy = None;
let mut shard_ident = None;
@@ -2202,6 +2597,10 @@ impl Service {
child_ids: Vec<TenantShardId>,
}
fail::fail_point!("shard-split-validation", |_| Err(ApiError::BadRequest(
anyhow::anyhow!("failpoint")
)));
// Validate input, and calculate which shards we will create
let (old_shard_count, targets, compute_hook) =
{
@@ -2363,6 +2762,9 @@ impl Service {
_ => return Err(ApiError::InternalServerError(e.into())),
}
}
fail::fail_point!("shard-split-post-begin", |_| Err(
ApiError::InternalServerError(anyhow::anyhow!("failpoint"))
));
// Now that I have persisted the splitting state, apply it in-memory. This is infallible, so
// callers may assume that if splitting is set in memory, then it was persisted, and if splitting
@@ -2372,6 +2774,12 @@ impl Service {
for target in &targets {
if let Some(parent_shard) = locked.tenants.get_mut(&target.parent_id) {
parent_shard.splitting = SplitState::Splitting;
// Put the observed state to None, to reflect that it is indeterminate once we start the
// split operation.
parent_shard
.observed
.locations
.insert(target.node.get_id(), ObservedStateLocation { conf: None });
}
}
}
@@ -2401,6 +2809,10 @@ impl Service {
.await
.map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
fail::fail_point!("shard-split-post-remote", |_| Err(ApiError::Conflict(
"failpoint".to_string()
)));
tracing::info!(
"Split {} into {}",
parent_id,
@@ -2435,67 +2847,13 @@ impl Service {
.complete_shard_split(tenant_id, old_shard_count)
.await?;
fail::fail_point!("shard-split-post-complete", |_| Err(
ApiError::InternalServerError(anyhow::anyhow!("failpoint"))
));
// Replace all the shards we just split with their children: this phase is infallible.
let mut response = TenantShardSplitResponse {
new_shards: Vec::new(),
};
let mut child_locations = Vec::new();
{
let mut locked = self.inner.write().unwrap();
let (_nodes, tenants, scheduler) = locked.parts_mut();
for target in targets {
let SplitTarget {
parent_id,
node: _node,
child_ids,
} = target;
let (pageserver, generation, config) = {
let mut old_state = tenants
.remove(&parent_id)
.expect("It was present, we just split it");
let old_attached = old_state.intent.get_attached().unwrap();
old_state.intent.clear(scheduler);
let generation = old_state.generation.expect("Shard must have been attached");
(old_attached, generation, old_state.config.clone())
};
for child in child_ids {
let mut child_shard = shard_ident;
child_shard.number = child.shard_number;
child_shard.count = child.shard_count;
let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
child_observed.insert(
pageserver,
ObservedStateLocation {
conf: Some(attached_location_conf(
generation,
&child_shard,
&config,
matches!(policy, PlacementPolicy::Double(n) if n > 0),
)),
},
);
let mut child_state = TenantState::new(child, child_shard, policy.clone());
child_state.intent = IntentState::single(scheduler, Some(pageserver));
child_state.observed = ObservedState {
locations: child_observed,
};
child_state.generation = Some(generation);
child_state.config = config.clone();
// The child's TenantState::splitting is intentionally left at the default value of Idle,
// as at this point in the split process we have succeeded and this part is infallible:
// we will never need to do any special recovery from this state.
child_locations.push((child, pageserver, child_shard.stripe_size));
tenants.insert(child, child_state);
response.new_shards.push(child);
}
}
}
let (response, child_locations) = self
.tenant_shard_split_commit_inmem(tenant_id, ShardCount::new(split_req.new_shard_count));
// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();