storage controller: add machinery for per-tenant locks

This commit is contained in:
John Spray
2024-03-07 13:25:55 +00:00
parent 25bd74fd6a
commit acd397a5f4
3 changed files with 100 additions and 2 deletions

View File

@@ -0,0 +1,54 @@
use std::{collections::HashMap, sync::Arc};
/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't
/// want to embed a lock in each one, or if your locking granularity is different to your object granularity.
/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking
/// is needed at a tenant-wide granularity.
pub(crate) struct IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
/// A synchronous lock for getting/setting the async locks that our callers will wait on.
entities: std::sync::Mutex<std::collections::HashMap<T, Arc<tokio::sync::RwLock<()>>>>,
}
impl<T> IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
pub(crate) fn shared(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockReadGuard<()>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().read_owned()
}
pub(crate) fn exclusive(
&self,
key: T,
) -> impl std::future::Future<Output = tokio::sync::OwnedRwLockWriteGuard<()>> {
let mut locked = self.entities.lock().unwrap();
let entry = locked.entry(key).or_default();
entry.clone().write_owned()
}
/// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do
/// periodic housekeeping to avoid the map growing indefinitely
pub(crate) fn housekeeping(&self) {
let mut locked = self.entities.lock().unwrap();
locked.retain(|_k, lock| lock.try_write().is_err())
}
}
impl<T> Default for IdLockMap<T>
where
T: Eq + PartialEq + std::hash::Hash,
{
fn default() -> Self {
Self {
entities: std::sync::Mutex::new(HashMap::new()),
}
}
}

View File

@@ -4,6 +4,7 @@ use utils::seqwait::MonotonicCounter;
mod auth;
mod compute_hook;
pub mod http;
mod id_lock_map;
pub mod metrics;
mod node;
pub mod persistence;

View File

@@ -7,6 +7,7 @@ use std::{
time::{Duration, Instant},
};
use crate::id_lock_map::IdLockMap;
use anyhow::Context;
use control_plane::attachment_service::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
@@ -153,6 +154,11 @@ pub struct Service {
config: Config,
persistence: Arc<Persistence>,
// Locking on a tenant granularity (covers all shards in the tenant):
// - Take exclusively for rare operations that mutate the tenant's persistent state (e.g. create/delete/split)
// - Take in shared mode for operations that need the set of shards to stay the same to complete reliably (e.g. timeline CRUD)
tenant_locks: IdLockMap<TenantId>,
// Process shutdown will fire this token
cancel: CancellationToken,
@@ -741,6 +747,7 @@ impl Service {
startup_complete: startup_complete.clone(),
cancel: CancellationToken::new(),
gate: Gate::default(),
tenant_locks: Default::default(),
});
let result_task_this = this.clone();
@@ -751,6 +758,23 @@ impl Service {
}
});
tokio::task::spawn({
let this = this.clone();
async move {
if let Ok(_gate) = this.gate.enter() {
loop {
tokio::select! {
_ = this.cancel.cancelled() => {
break;
},
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
};
this.tenant_locks.housekeeping();
}
}
}
});
tokio::task::spawn({
let this = this.clone();
// We will block the [`Service::startup_complete`] barrier until [`Self::startup_reconcile`]
@@ -1055,6 +1079,12 @@ impl Service {
&self,
create_req: TenantCreateRequest,
) -> Result<TenantCreateResponse, ApiError> {
// Exclude any concurrent attempts to create/access the same tenant ID
let _tenant_lock = self
.tenant_locks
.exclusive(create_req.new_tenant_id.tenant_id)
.await;
let (response, waiters) = self.do_tenant_create(create_req).await?;
self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
@@ -1382,6 +1412,9 @@ impl Service {
tenant_id: TenantId,
req: TenantLocationConfigRequest,
) -> Result<TenantLocationConfigResponse, ApiError> {
// We require an exclusive lock, because we are updating both persistent and in-memory state
let _tenant_lock = self.tenant_locks.exclusive(tenant_id).await;
if !req.tenant_id.is_unsharded() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"This API is for importing single-sharded or unsharded tenants"
@@ -1505,6 +1538,9 @@ impl Service {
}
pub(crate) async fn tenant_config_set(&self, req: TenantConfigRequest) -> Result<(), ApiError> {
// We require an exclusive lock, because we are updating persistent and in-memory state
let _tenant_lock = self.tenant_locks.exclusive(req.tenant_id).await;
let tenant_id = req.tenant_id;
let config = req.config;
@@ -1596,6 +1632,8 @@ impl Service {
timestamp: Cow<'_, str>,
done_if_after: Cow<'_, str>,
) -> Result<(), ApiError> {
let _tenant_lock = self.tenant_locks.exclusive(tenant_id).await;
let node = {
let locked = self.inner.read().unwrap();
// Just a sanity check to prevent misuse: the API expects that the tenant is fully
@@ -1681,6 +1719,8 @@ impl Service {
&self,
tenant_id: TenantId,
) -> Result<(), ApiError> {
let _tenant_lock = self.tenant_locks.shared(tenant_id).await;
// Acquire lock and yield the collection of shard-node tuples which we will send requests onward to
let targets = {
let locked = self.inner.read().unwrap();
@@ -1730,6 +1770,8 @@ impl Service {
}
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
let _tenant_lock = self.tenant_locks.exclusive(tenant_id).await;
self.ensure_attached_wait(tenant_id).await?;
// TODO: refactor into helper
@@ -1826,6 +1868,8 @@ impl Service {
create_req.new_timeline_id,
);
let _tenant_lock = self.tenant_locks.shared(tenant_id).await;
self.ensure_attached_wait(tenant_id).await?;
// TODO: refuse to do this if shard splitting is in progress
@@ -1951,11 +1995,10 @@ impl Service {
timeline_id: TimelineId,
) -> Result<StatusCode, ApiError> {
tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
let _tenant_lock = self.tenant_locks.shared(tenant_id).await;
self.ensure_attached_wait(tenant_id).await?;
// TODO: refuse to do this if shard splitting is in progress
// (https://github.com/neondatabase/neon/issues/6676)
let mut targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();