From acd397a5f4f4e8d6490fd8f0f87d5dd13844c2e0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 7 Mar 2024 13:25:55 +0000 Subject: [PATCH] storage controller: add machinery for per-tenant locks --- .../attachment_service/src/id_lock_map.rs | 54 +++++++++++++++++++ control_plane/attachment_service/src/lib.rs | 1 + .../attachment_service/src/service.rs | 47 +++++++++++++++- 3 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 control_plane/attachment_service/src/id_lock_map.rs diff --git a/control_plane/attachment_service/src/id_lock_map.rs b/control_plane/attachment_service/src/id_lock_map.rs new file mode 100644 index 0000000000..b03700b50c --- /dev/null +++ b/control_plane/attachment_service/src/id_lock_map.rs @@ -0,0 +1,54 @@ +use std::{collections::HashMap, sync::Arc}; + +/// A map of locks covering some arbitrary identifiers. Useful if you have a collection of objects but don't +/// want to embed a lock in each one, or if your locking granularity is different to your object granularity. +/// For example, used in the storage controller where the objects are tenant shards, but sometimes locking +/// is needed at a tenant-wide granularity. +pub(crate) struct IdLockMap +where + T: Eq + PartialEq + std::hash::Hash, +{ + /// A synchronous lock for getting/setting the async locks that our callers will wait on. + entities: std::sync::Mutex>>>, +} + +impl IdLockMap +where + T: Eq + PartialEq + std::hash::Hash, +{ + pub(crate) fn shared( + &self, + key: T, + ) -> impl std::future::Future> { + let mut locked = self.entities.lock().unwrap(); + let entry = locked.entry(key).or_default(); + entry.clone().read_owned() + } + + pub(crate) fn exclusive( + &self, + key: T, + ) -> impl std::future::Future> { + let mut locked = self.entities.lock().unwrap(); + let entry = locked.entry(key).or_default(); + entry.clone().write_owned() + } + + /// Rather than building a lock guard that re-takes the [`Self::entities`] lock, we just do + /// periodic housekeeping to avoid the map growing indefinitely + pub(crate) fn housekeeping(&self) { + let mut locked = self.entities.lock().unwrap(); + locked.retain(|_k, lock| lock.try_write().is_err()) + } +} + +impl Default for IdLockMap +where + T: Eq + PartialEq + std::hash::Hash, +{ + fn default() -> Self { + Self { + entities: std::sync::Mutex::new(HashMap::new()), + } + } +} diff --git a/control_plane/attachment_service/src/lib.rs b/control_plane/attachment_service/src/lib.rs index 796b465c10..a017bc1ecc 100644 --- a/control_plane/attachment_service/src/lib.rs +++ b/control_plane/attachment_service/src/lib.rs @@ -4,6 +4,7 @@ use utils::seqwait::MonotonicCounter; mod auth; mod compute_hook; pub mod http; +mod id_lock_map; pub mod metrics; mod node; pub mod persistence; diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 259551d144..1c9e5ae511 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -7,6 +7,7 @@ use std::{ time::{Duration, Instant}, }; +use crate::id_lock_map::IdLockMap; use anyhow::Context; use control_plane::attachment_service::{ AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, @@ -153,6 +154,11 @@ pub struct Service { config: Config, persistence: Arc, + // Locking on a tenant granularity (covers all shards in the tenant): + // - Take exclusively for rare operations that mutate the tenant's persistent state (e.g. create/delete/split) + // - Take in shared mode for operations that need the set of shards to stay the same to complete reliably (e.g. timeline CRUD) + tenant_locks: IdLockMap, + // Process shutdown will fire this token cancel: CancellationToken, @@ -741,6 +747,7 @@ impl Service { startup_complete: startup_complete.clone(), cancel: CancellationToken::new(), gate: Gate::default(), + tenant_locks: Default::default(), }); let result_task_this = this.clone(); @@ -751,6 +758,23 @@ impl Service { } }); + tokio::task::spawn({ + let this = this.clone(); + async move { + if let Ok(_gate) = this.gate.enter() { + loop { + tokio::select! { + _ = this.cancel.cancelled() => { + break; + }, + _ = tokio::time::sleep(Duration::from_secs(60)) => {} + }; + this.tenant_locks.housekeeping(); + } + } + } + }); + tokio::task::spawn({ let this = this.clone(); // We will block the [`Service::startup_complete`] barrier until [`Self::startup_reconcile`] @@ -1055,6 +1079,12 @@ impl Service { &self, create_req: TenantCreateRequest, ) -> Result { + // Exclude any concurrent attempts to create/access the same tenant ID + let _tenant_lock = self + .tenant_locks + .exclusive(create_req.new_tenant_id.tenant_id) + .await; + let (response, waiters) = self.do_tenant_create(create_req).await?; self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?; @@ -1382,6 +1412,9 @@ impl Service { tenant_id: TenantId, req: TenantLocationConfigRequest, ) -> Result { + // We require an exclusive lock, because we are updating both persistent and in-memory state + let _tenant_lock = self.tenant_locks.exclusive(tenant_id).await; + if !req.tenant_id.is_unsharded() { return Err(ApiError::BadRequest(anyhow::anyhow!( "This API is for importing single-sharded or unsharded tenants" @@ -1505,6 +1538,9 @@ impl Service { } pub(crate) async fn tenant_config_set(&self, req: TenantConfigRequest) -> Result<(), ApiError> { + // We require an exclusive lock, because we are updating persistent and in-memory state + let _tenant_lock = self.tenant_locks.exclusive(req.tenant_id).await; + let tenant_id = req.tenant_id; let config = req.config; @@ -1596,6 +1632,8 @@ impl Service { timestamp: Cow<'_, str>, done_if_after: Cow<'_, str>, ) -> Result<(), ApiError> { + let _tenant_lock = self.tenant_locks.exclusive(tenant_id).await; + let node = { let locked = self.inner.read().unwrap(); // Just a sanity check to prevent misuse: the API expects that the tenant is fully @@ -1681,6 +1719,8 @@ impl Service { &self, tenant_id: TenantId, ) -> Result<(), ApiError> { + let _tenant_lock = self.tenant_locks.shared(tenant_id).await; + // Acquire lock and yield the collection of shard-node tuples which we will send requests onward to let targets = { let locked = self.inner.read().unwrap(); @@ -1730,6 +1770,8 @@ impl Service { } pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result { + let _tenant_lock = self.tenant_locks.exclusive(tenant_id).await; + self.ensure_attached_wait(tenant_id).await?; // TODO: refactor into helper @@ -1826,6 +1868,8 @@ impl Service { create_req.new_timeline_id, ); + let _tenant_lock = self.tenant_locks.shared(tenant_id).await; + self.ensure_attached_wait(tenant_id).await?; // TODO: refuse to do this if shard splitting is in progress @@ -1951,11 +1995,10 @@ impl Service { timeline_id: TimelineId, ) -> Result { tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,); + let _tenant_lock = self.tenant_locks.shared(tenant_id).await; self.ensure_attached_wait(tenant_id).await?; - // TODO: refuse to do this if shard splitting is in progress - // (https://github.com/neondatabase/neon/issues/6676) let mut targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new();