mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
storage controller: use SERIALIZABLE isolation level (#7792)
## Problem The storage controller generally assumes that things like updating generation numbers are atomic: it should use a strict isolation level. ## Summary of changes - Wrap all database operations in a SERIALIZABLE transaction. - Retry serialization failures, as these do not indicate problems and are normal when plenty of concurrent work is happening. Using this isolation level for all reads is overkill, but much simpler than reasoning about it on a per-operation basis, and does not hurt performance. Tested this with a modified version of storage_controller_many_tenants test with 128k shards, to check that our performance is still fine: it is.
This commit is contained in:
@@ -173,7 +173,7 @@ impl Persistence {
|
||||
/// Wraps `with_conn` in order to collect latency and error metrics
|
||||
async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
|
||||
where
|
||||
F: FnOnce(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
|
||||
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let latency = &METRICS_REGISTRY
|
||||
@@ -199,13 +199,48 @@ impl Persistence {
|
||||
/// Call the provided function in a tokio blocking thread, with a Diesel database connection.
|
||||
async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
|
||||
where
|
||||
F: FnOnce(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
|
||||
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
// A generous allowance for how many times we may retry serializable transactions
|
||||
// before giving up. This is not expected to be hit: it is a defensive measure in case we
|
||||
// somehow engineer a situation where duelling transactions might otherwise live-lock.
|
||||
const MAX_RETRIES: usize = 128;
|
||||
|
||||
let mut conn = self.connection_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
|
||||
.await
|
||||
.expect("Task panic")
|
||||
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
|
||||
let mut retry_count = 0;
|
||||
loop {
|
||||
match conn.build_transaction().serializable().run(|c| func(c)) {
|
||||
Ok(r) => break Ok(r),
|
||||
Err(
|
||||
err @ DatabaseError::Query(diesel::result::Error::DatabaseError(
|
||||
diesel::result::DatabaseErrorKind::SerializationFailure,
|
||||
_,
|
||||
)),
|
||||
) => {
|
||||
retry_count += 1;
|
||||
if retry_count > MAX_RETRIES {
|
||||
tracing::error!(
|
||||
"Exceeded max retries on SerializationFailure errors: {err:?}"
|
||||
);
|
||||
break Err(err);
|
||||
} else {
|
||||
// Retry on serialization errors: these are expected, because even though our
|
||||
// transactions don't fight for the same rows, they will occasionally collide
|
||||
// on index pages (e.g. increment_generation for unrelated shards can collide)
|
||||
tracing::debug!(
|
||||
"Retrying transaction on serialization failure {err:?}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => break Err(e),
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("Task panic")
|
||||
}
|
||||
|
||||
/// When a node is first registered, persist it before using it for anything
|
||||
@@ -358,14 +393,11 @@ impl Persistence {
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::InsertTenantShards,
|
||||
move |conn| -> DatabaseResult<()> {
|
||||
conn.transaction(|conn| -> QueryResult<()> {
|
||||
for tenant in &shards {
|
||||
diesel::insert_into(tenant_shards)
|
||||
.values(tenant)
|
||||
.execute(conn)?;
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
for tenant in &shards {
|
||||
diesel::insert_into(tenant_shards)
|
||||
.values(tenant)
|
||||
.execute(conn)?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
@@ -533,8 +565,11 @@ impl Persistence {
|
||||
let update = ShardUpdate {
|
||||
generation: input_generation.map(|g| g.into().unwrap() as i32),
|
||||
placement_policy: input_placement_policy
|
||||
.as_ref()
|
||||
.map(|p| serde_json::to_string(&p).unwrap()),
|
||||
config: input_config.map(|c| serde_json::to_string(&c).unwrap()),
|
||||
config: input_config
|
||||
.as_ref()
|
||||
.map(|c| serde_json::to_string(&c).unwrap()),
|
||||
scheduling_policy: input_scheduling_policy
|
||||
.map(|p| serde_json::to_string(&p).unwrap()),
|
||||
};
|
||||
@@ -581,55 +616,51 @@ impl Persistence {
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::tenant_shards::dsl::*;
|
||||
self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
|
||||
conn.transaction(|conn| -> DatabaseResult<()> {
|
||||
// Mark parent shards as splitting
|
||||
// Mark parent shards as splitting
|
||||
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(old_shard_count.literal() as i32))
|
||||
.set((splitting.eq(1),))
|
||||
.execute(conn)?;
|
||||
if u8::try_from(updated)
|
||||
.map_err(|_| DatabaseError::Logical(
|
||||
format!("Overflow existing shard count {} while splitting", updated))
|
||||
)? != old_shard_count.count() {
|
||||
// Perhaps a deletion or another split raced with this attempt to split, mutating
|
||||
// the parent shards that we intend to split. In this case the split request should fail.
|
||||
return Err(DatabaseError::Logical(
|
||||
format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
|
||||
));
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(old_shard_count.literal() as i32))
|
||||
.set((splitting.eq(1),))
|
||||
.execute(conn)?;
|
||||
if u8::try_from(updated)
|
||||
.map_err(|_| DatabaseError::Logical(
|
||||
format!("Overflow existing shard count {} while splitting", updated))
|
||||
)? != old_shard_count.count() {
|
||||
// Perhaps a deletion or another split raced with this attempt to split, mutating
|
||||
// the parent shards that we intend to split. In this case the split request should fail.
|
||||
return Err(DatabaseError::Logical(
|
||||
format!("Unexpected existing shard count {updated} when preparing tenant for split (expected {})", old_shard_count.count())
|
||||
));
|
||||
}
|
||||
|
||||
// FIXME: spurious clone to sidestep closure move rules
|
||||
let parent_to_children = parent_to_children.clone();
|
||||
|
||||
// Insert child shards
|
||||
for (parent_shard_id, children) in parent_to_children {
|
||||
let mut parent = crate::schema::tenant_shards::table
|
||||
.filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
|
||||
.load::<TenantShardPersistence>(conn)?;
|
||||
let parent = if parent.len() != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Parent shard {parent_shard_id} not found"
|
||||
)));
|
||||
} else {
|
||||
parent.pop().unwrap()
|
||||
};
|
||||
for mut shard in children {
|
||||
// Carry the parent's generation into the child
|
||||
shard.generation = parent.generation;
|
||||
|
||||
debug_assert!(shard.splitting == SplitState::Splitting);
|
||||
diesel::insert_into(tenant_shards)
|
||||
.values(shard)
|
||||
.execute(conn)?;
|
||||
}
|
||||
|
||||
// FIXME: spurious clone to sidestep closure move rules
|
||||
let parent_to_children = parent_to_children.clone();
|
||||
|
||||
// Insert child shards
|
||||
for (parent_shard_id, children) in parent_to_children {
|
||||
let mut parent = crate::schema::tenant_shards::table
|
||||
.filter(tenant_id.eq(parent_shard_id.tenant_id.to_string()))
|
||||
.filter(shard_number.eq(parent_shard_id.shard_number.0 as i32))
|
||||
.filter(shard_count.eq(parent_shard_id.shard_count.literal() as i32))
|
||||
.load::<TenantShardPersistence>(conn)?;
|
||||
let parent = if parent.len() != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Parent shard {parent_shard_id} not found"
|
||||
)));
|
||||
} else {
|
||||
parent.pop().unwrap()
|
||||
};
|
||||
for mut shard in children {
|
||||
// Carry the parent's generation into the child
|
||||
shard.generation = parent.generation;
|
||||
|
||||
debug_assert!(shard.splitting == SplitState::Splitting);
|
||||
diesel::insert_into(tenant_shards)
|
||||
.values(shard)
|
||||
.execute(conn)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
@@ -647,22 +678,18 @@ impl Persistence {
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::CompleteShardSplit,
|
||||
move |conn| -> DatabaseResult<()> {
|
||||
conn.transaction(|conn| -> QueryResult<()> {
|
||||
// Drop parent shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(old_shard_count.literal() as i32))
|
||||
.execute(conn)?;
|
||||
// Drop parent shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(old_shard_count.literal() as i32))
|
||||
.execute(conn)?;
|
||||
|
||||
// Clear sharding flag
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)?;
|
||||
debug_assert!(updated > 0);
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
// Clear sharding flag
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)?;
|
||||
debug_assert!(updated > 0);
|
||||
|
||||
Ok(())
|
||||
},
|
||||
@@ -681,39 +708,34 @@ impl Persistence {
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::AbortShardSplit,
|
||||
move |conn| -> DatabaseResult<AbortShardSplitStatus> {
|
||||
let aborted =
|
||||
conn.transaction(|conn| -> DatabaseResult<AbortShardSplitStatus> {
|
||||
// Clear the splitting state on parent shards
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.ne(new_shard_count.literal() as i32))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)?;
|
||||
// Clear the splitting state on parent shards
|
||||
let updated = diesel::update(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.ne(new_shard_count.literal() as i32))
|
||||
.set((splitting.eq(0),))
|
||||
.execute(conn)?;
|
||||
|
||||
// Parent shards are already gone: we cannot abort.
|
||||
if updated == 0 {
|
||||
return Ok(AbortShardSplitStatus::Complete);
|
||||
}
|
||||
// Parent shards are already gone: we cannot abort.
|
||||
if updated == 0 {
|
||||
return Ok(AbortShardSplitStatus::Complete);
|
||||
}
|
||||
|
||||
// Sanity check: if parent shards were present, their cardinality should
|
||||
// be less than the number of child shards.
|
||||
if updated >= new_shard_count.count() as usize {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Unexpected parent shard count {updated} while aborting split to \
|
||||
// Sanity check: if parent shards were present, their cardinality should
|
||||
// be less than the number of child shards.
|
||||
if updated >= new_shard_count.count() as usize {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Unexpected parent shard count {updated} while aborting split to \
|
||||
count {new_shard_count:?} on tenant {split_tenant_id}"
|
||||
)));
|
||||
}
|
||||
)));
|
||||
}
|
||||
|
||||
// Erase child shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(new_shard_count.literal() as i32))
|
||||
.execute(conn)?;
|
||||
// Erase child shards
|
||||
diesel::delete(tenant_shards)
|
||||
.filter(tenant_id.eq(split_tenant_id.to_string()))
|
||||
.filter(shard_count.eq(new_shard_count.literal() as i32))
|
||||
.execute(conn)?;
|
||||
|
||||
Ok(AbortShardSplitStatus::Aborted)
|
||||
})?;
|
||||
|
||||
Ok(aborted)
|
||||
Ok(AbortShardSplitStatus::Aborted)
|
||||
},
|
||||
)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user