control_plane: revise compute_hook locking (don't serialize all calls)

This commit is contained in:
John Spray
2024-03-04 17:58:27 +00:00
parent 2baac6f6e6
commit 223810fd79

View File

@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
@@ -23,10 +24,13 @@ struct ShardedComputeHookTenant {
stripe_size: ShardStripeSize,
shard_count: ShardCount,
shards: Vec<(ShardNumber, NodeId)>,
// Async lock used for ensuring that remote compute hook calls are ordered identically to updates to this structure
lock: Arc<tokio::sync::Mutex<()>>,
}
enum ComputeHookTenant {
Unsharded(NodeId),
Unsharded((NodeId, Arc<tokio::sync::Mutex<()>>)),
Sharded(ShardedComputeHookTenant),
}
@@ -38,9 +42,17 @@ impl ComputeHookTenant {
shards: vec![(tenant_shard_id.shard_number, node_id)],
stripe_size,
shard_count: tenant_shard_id.shard_count,
lock: Arc::default(),
})
} else {
Self::Unsharded(node_id)
Self::Unsharded((node_id, Arc::default()))
}
}
fn get_lock(&self) -> &Arc<tokio::sync::Mutex<()>> {
match self {
Self::Unsharded((_node_id, lock)) => lock,
Self::Sharded(sharded_tenant) => &sharded_tenant.lock,
}
}
@@ -53,7 +65,9 @@ impl ComputeHookTenant {
node_id: NodeId,
) {
match self {
Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => {
Self::Unsharded((existing_node_id, _lock))
if tenant_shard_id.shard_count.count() == 1 =>
{
*existing_node_id = node_id
}
Self::Sharded(sharded_tenant)
@@ -122,9 +136,15 @@ pub(crate) enum NotifyError {
}
impl ComputeHookTenant {
fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option<ComputeHookNotifyRequest> {
match self {
Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest {
fn maybe_reconfigure(
&self,
tenant_id: TenantId,
) -> Option<(
ComputeHookNotifyRequest,
impl std::future::Future<Output = tokio::sync::OwnedMutexGuard<()>>,
)> {
let request = match self {
Self::Unsharded((node_id, _lock)) => Some(ComputeHookNotifyRequest {
tenant_id,
shards: vec![ComputeHookNotifyRequestShard {
shard_number: ShardNumber(0),
@@ -158,7 +178,9 @@ impl ComputeHookTenant {
);
None
}
}
};
request.map(|r| (r, self.get_lock().clone().lock_owned()))
}
}
@@ -167,8 +189,11 @@ impl ComputeHookTenant {
/// the compute connection string.
pub(super) struct ComputeHook {
config: Config,
state: tokio::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
authorization_header: Option<String>,
// This lock is only used in testing enviroments, to serialize calls into neon_lock
neon_local_lock: tokio::sync::Mutex<()>,
}
impl ComputeHook {
@@ -182,6 +207,7 @@ impl ComputeHook {
state: Default::default(),
config,
authorization_header,
neon_local_lock: Default::default(),
}
}
@@ -190,6 +216,10 @@ impl ComputeHook {
&self,
reconfigure_request: ComputeHookNotifyRequest,
) -> anyhow::Result<()> {
// neon_local updates are not safe to call concurrently, use a lock to serialize
// all calls to this function
let _locked = self.neon_local_lock.lock().await;
let env = match LocalEnv::load_config() {
Ok(e) => e,
Err(e) => {
@@ -340,30 +370,38 @@ impl ComputeHook {
stripe_size: ShardStripeSize,
cancel: &CancellationToken,
) -> Result<(), NotifyError> {
let mut locked = self.state.lock().await;
let reconfigure_request = {
let mut locked = self.state.lock().unwrap();
use std::collections::hash_map::Entry;
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant
}
use std::collections::hash_map::Entry;
let tenant = match locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
tenant_shard_id,
stripe_size,
node_id,
)),
Entry::Occupied(e) => {
let tenant = e.into_mut();
tenant.update(tenant_shard_id, stripe_size, node_id);
tenant
}
};
tenant.maybe_reconfigure(tenant_shard_id.tenant_id)
};
let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id);
let Some(reconfigure_request) = reconfigure_request else {
let Some((reconfigure_request, lock_fut)) = reconfigure_request else {
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
// until it does.
tracing::info!("Tenant isn't yet ready to emit a notification");
return Ok(());
};
// Finish acquiring the tenant's async lock: this future was created inside the self.state
// lock above, so we are guaranteed to get this lock in the same order as callers took
// that lock. This ordering is essential: the cloud control plane must end up with the
// same end state for the tenant that we see.
let _guard = lock_fut.await;
if let Some(notify_url) = &self.config.compute_hook_url {
self.do_notify(notify_url, reconfigure_request, cancel)
.await
@@ -405,6 +443,7 @@ pub(crate) mod tests {
tenant_state
.maybe_reconfigure(tenant_id)
.unwrap()
.0
.shards
.len(),
1
@@ -412,6 +451,7 @@ pub(crate) mod tests {
assert!(tenant_state
.maybe_reconfigure(tenant_id)
.unwrap()
.0
.stripe_size
.is_none());
@@ -445,6 +485,7 @@ pub(crate) mod tests {
tenant_state
.maybe_reconfigure(tenant_id)
.unwrap()
.0
.shards
.len(),
2
@@ -453,6 +494,7 @@ pub(crate) mod tests {
tenant_state
.maybe_reconfigure(tenant_id)
.unwrap()
.0
.stripe_size,
Some(ShardStripeSize(32768))
);