From 223810fd79e62986603048439226f6659724cc37 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 4 Mar 2024 17:58:27 +0000 Subject: [PATCH] control_plane: revise compute_hook locking (don't serialize all calls) --- .../attachment_service/src/compute_hook.rs | 90 ++++++++++++++----- 1 file changed, 66 insertions(+), 24 deletions(-) diff --git a/control_plane/attachment_service/src/compute_hook.rs b/control_plane/attachment_service/src/compute_hook.rs index bebc62ac2f..e3be853911 100644 --- a/control_plane/attachment_service/src/compute_hook.rs +++ b/control_plane/attachment_service/src/compute_hook.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, time::Duration}; use control_plane::endpoint::{ComputeControlPlane, EndpointStatus}; @@ -23,10 +24,13 @@ struct ShardedComputeHookTenant { stripe_size: ShardStripeSize, shard_count: ShardCount, shards: Vec<(ShardNumber, NodeId)>, + + // Async lock used for ensuring that remote compute hook calls are ordered identically to updates to this structure + lock: Arc>, } enum ComputeHookTenant { - Unsharded(NodeId), + Unsharded((NodeId, Arc>)), Sharded(ShardedComputeHookTenant), } @@ -38,9 +42,17 @@ impl ComputeHookTenant { shards: vec![(tenant_shard_id.shard_number, node_id)], stripe_size, shard_count: tenant_shard_id.shard_count, + lock: Arc::default(), }) } else { - Self::Unsharded(node_id) + Self::Unsharded((node_id, Arc::default())) + } + } + + fn get_lock(&self) -> &Arc> { + match self { + Self::Unsharded((_node_id, lock)) => lock, + Self::Sharded(sharded_tenant) => &sharded_tenant.lock, } } @@ -53,7 +65,9 @@ impl ComputeHookTenant { node_id: NodeId, ) { match self { - Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => { + Self::Unsharded((existing_node_id, _lock)) + if tenant_shard_id.shard_count.count() == 1 => + { *existing_node_id = node_id } Self::Sharded(sharded_tenant) @@ -122,9 +136,15 @@ pub(crate) enum NotifyError { } impl ComputeHookTenant { - fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option { - match self { - Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest { + fn maybe_reconfigure( + &self, + tenant_id: TenantId, + ) -> Option<( + ComputeHookNotifyRequest, + impl std::future::Future>, + )> { + let request = match self { + Self::Unsharded((node_id, _lock)) => Some(ComputeHookNotifyRequest { tenant_id, shards: vec![ComputeHookNotifyRequestShard { shard_number: ShardNumber(0), @@ -158,7 +178,9 @@ impl ComputeHookTenant { ); None } - } + }; + + request.map(|r| (r, self.get_lock().clone().lock_owned())) } } @@ -167,8 +189,11 @@ impl ComputeHookTenant { /// the compute connection string. pub(super) struct ComputeHook { config: Config, - state: tokio::sync::Mutex>, + state: std::sync::Mutex>, authorization_header: Option, + + // This lock is only used in testing enviroments, to serialize calls into neon_lock + neon_local_lock: tokio::sync::Mutex<()>, } impl ComputeHook { @@ -182,6 +207,7 @@ impl ComputeHook { state: Default::default(), config, authorization_header, + neon_local_lock: Default::default(), } } @@ -190,6 +216,10 @@ impl ComputeHook { &self, reconfigure_request: ComputeHookNotifyRequest, ) -> anyhow::Result<()> { + // neon_local updates are not safe to call concurrently, use a lock to serialize + // all calls to this function + let _locked = self.neon_local_lock.lock().await; + let env = match LocalEnv::load_config() { Ok(e) => e, Err(e) => { @@ -340,30 +370,38 @@ impl ComputeHook { stripe_size: ShardStripeSize, cancel: &CancellationToken, ) -> Result<(), NotifyError> { - let mut locked = self.state.lock().await; + let reconfigure_request = { + let mut locked = self.state.lock().unwrap(); - use std::collections::hash_map::Entry; - let tenant = match locked.entry(tenant_shard_id.tenant_id) { - Entry::Vacant(e) => e.insert(ComputeHookTenant::new( - tenant_shard_id, - stripe_size, - node_id, - )), - Entry::Occupied(e) => { - let tenant = e.into_mut(); - tenant.update(tenant_shard_id, stripe_size, node_id); - tenant - } + use std::collections::hash_map::Entry; + let tenant = match locked.entry(tenant_shard_id.tenant_id) { + Entry::Vacant(e) => e.insert(ComputeHookTenant::new( + tenant_shard_id, + stripe_size, + node_id, + )), + Entry::Occupied(e) => { + let tenant = e.into_mut(); + tenant.update(tenant_shard_id, stripe_size, node_id); + tenant + } + }; + + tenant.maybe_reconfigure(tenant_shard_id.tenant_id) }; - - let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id); - let Some(reconfigure_request) = reconfigure_request else { + let Some((reconfigure_request, lock_fut)) = reconfigure_request else { // The tenant doesn't yet have pageservers for all its shards: we won't notify anything // until it does. tracing::info!("Tenant isn't yet ready to emit a notification"); return Ok(()); }; + // Finish acquiring the tenant's async lock: this future was created inside the self.state + // lock above, so we are guaranteed to get this lock in the same order as callers took + // that lock. This ordering is essential: the cloud control plane must end up with the + // same end state for the tenant that we see. + let _guard = lock_fut.await; + if let Some(notify_url) = &self.config.compute_hook_url { self.do_notify(notify_url, reconfigure_request, cancel) .await @@ -405,6 +443,7 @@ pub(crate) mod tests { tenant_state .maybe_reconfigure(tenant_id) .unwrap() + .0 .shards .len(), 1 @@ -412,6 +451,7 @@ pub(crate) mod tests { assert!(tenant_state .maybe_reconfigure(tenant_id) .unwrap() + .0 .stripe_size .is_none()); @@ -445,6 +485,7 @@ pub(crate) mod tests { tenant_state .maybe_reconfigure(tenant_id) .unwrap() + .0 .shards .len(), 2 @@ -453,6 +494,7 @@ pub(crate) mod tests { tenant_state .maybe_reconfigure(tenant_id) .unwrap() + .0 .stripe_size, Some(ShardStripeSize(32768)) );