diff --git a/storage_controller/src/compute_hook.rs b/storage_controller/src/compute_hook.rs index 1a8dc6b86d..eb0c4472e4 100644 --- a/storage_controller/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, time::Duration}; use control_plane::endpoint::{ComputeControlPlane, EndpointStatus}; @@ -18,14 +19,26 @@ const SLOWDOWN_DELAY: Duration = Duration::from_secs(5); pub(crate) const API_CONCURRENCY: usize = 32; +struct UnshardedComputeHookTenant { + // Which node is this tenant attached to + node_id: NodeId, + + // Must hold this lock to send a notification. + send_lock: Arc>>, +} struct ShardedComputeHookTenant { stripe_size: ShardStripeSize, shard_count: ShardCount, shards: Vec<(ShardNumber, NodeId)>, + + // Must hold this lock to send a notification. The contents represent + // the last successfully sent notification, and are used to coalesce multiple + // updates by only sending when there is a chance since our last successful send. + send_lock: Arc>>, } enum ComputeHookTenant { - Unsharded(NodeId), + Unsharded(UnshardedComputeHookTenant), Sharded(ShardedComputeHookTenant), } @@ -37,9 +50,20 @@ impl ComputeHookTenant { shards: vec![(tenant_shard_id.shard_number, node_id)], stripe_size, shard_count: tenant_shard_id.shard_count, + send_lock: Arc::default(), }) } else { - Self::Unsharded(node_id) + Self::Unsharded(UnshardedComputeHookTenant { + node_id, + send_lock: Arc::default(), + }) + } + } + + fn get_send_lock(&self) -> &Arc>> { + match self { + Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock, + Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock, } } @@ -52,8 +76,8 @@ impl ComputeHookTenant { node_id: NodeId, ) { match self { - Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => { - *existing_node_id = node_id + Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => { + unsharded_tenant.node_id = node_id } Self::Sharded(sharded_tenant) if sharded_tenant.stripe_size == stripe_size @@ -80,14 +104,14 @@ impl ComputeHookTenant { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] struct ComputeHookNotifyRequestShard { node_id: NodeId, shard_number: ShardNumber, } /// Request body that we send to the control plane to notify it of where a tenant is attached -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] struct ComputeHookNotifyRequest { tenant_id: TenantId, stripe_size: Option, @@ -120,14 +144,44 @@ pub(crate) enum NotifyError { Fatal(StatusCode), } +enum MaybeSendResult { + // Please send this request while holding the lock, and if you succeed then write + // the request into the lock. + Transmit( + ( + ComputeHookNotifyRequest, + tokio::sync::OwnedMutexGuard>, + ), + ), + // Something requires sending, but you must wait for a current sender then call again + AwaitLock(Arc>>), + // Nothing requires sending + Noop, +} + impl ComputeHookTenant { - fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option { - match self { - Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest { + fn maybe_send( + &self, + tenant_id: TenantId, + lock: Option>>, + ) -> MaybeSendResult { + let locked = match lock { + Some(already_locked) => already_locked, + None => { + // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock. + let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else { + return MaybeSendResult::AwaitLock(self.get_send_lock().clone()); + }; + locked + } + }; + + let request = match self { + Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest { tenant_id, shards: vec![ComputeHookNotifyRequestShard { shard_number: ShardNumber(0), - node_id: *node_id, + node_id: unsharded_tenant.node_id, }], stripe_size: None, }), @@ -151,12 +205,25 @@ impl ComputeHookTenant { // Sharded tenant doesn't yet have information for all its shards tracing::info!( - "ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})", + "ComputeHookTenant::maybe_send: not enough shards ({}/{})", sharded_tenant.shards.len(), sharded_tenant.shard_count.count() ); None } + }; + + match request { + None => { + // Not yet ready to emit a notification + tracing::info!("Tenant isn't yet ready to emit a notification"); + MaybeSendResult::Noop + } + Some(request) if Some(&request) == locked.as_ref() => { + // No change from the last value successfully sent + MaybeSendResult::Noop + } + Some(request) => MaybeSendResult::Transmit((request, locked)), } } } @@ -166,8 +233,15 @@ impl ComputeHookTenant { /// the compute connection string. pub(super) struct ComputeHook { config: Config, - state: tokio::sync::Mutex>, + state: std::sync::Mutex>, authorization_header: Option, + + // Concurrency limiter, so that we do not overload the cloud control plane when updating + // large numbers of tenants (e.g. when failing over after a node failure) + api_concurrency: tokio::sync::Semaphore, + + // This lock is only used in testing enviroments, to serialize calls into neon_lock + neon_local_lock: tokio::sync::Mutex<()>, } impl ComputeHook { @@ -181,14 +255,20 @@ impl ComputeHook { state: Default::default(), config, authorization_header, + neon_local_lock: Default::default(), + api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY), } } /// For test environments: use neon_local's LocalEnv to update compute async fn do_notify_local( &self, - reconfigure_request: ComputeHookNotifyRequest, + reconfigure_request: &ComputeHookNotifyRequest, ) -> anyhow::Result<()> { + // neon_local updates are not safe to call concurrently, use a lock to serialize + // all calls to this function + let _locked = self.neon_local_lock.lock().await; + let env = match LocalEnv::load_config() { Ok(e) => e, Err(e) => { @@ -205,7 +285,7 @@ impl ComputeHook { } = reconfigure_request; let compute_pageservers = shards - .into_iter() + .iter() .map(|shard| { let ps_conf = env .get_pageserver_conf(shard.node_id) @@ -217,10 +297,10 @@ impl ComputeHook { .collect::>(); for (endpoint_name, endpoint) in &cplane.endpoints { - if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running { + if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running { tracing::info!("Reconfiguring endpoint {}", endpoint_name,); endpoint - .reconfigure(compute_pageservers.clone(), stripe_size) + .reconfigure(compute_pageservers.clone(), *stripe_size) .await?; } } @@ -298,12 +378,23 @@ impl ComputeHook { async fn do_notify( &self, url: &String, - reconfigure_request: ComputeHookNotifyRequest, + reconfigure_request: &ComputeHookNotifyRequest, cancel: &CancellationToken, ) -> Result<(), NotifyError> { let client = reqwest::Client::new(); + + // We hold these semaphore units across all retries, rather than only across each + // HTTP request: this is to preserve fairness and avoid a situation where a retry might + // time out waiting for a semaphore. + let _units = self + .api_concurrency + .acquire() + .await + // Interpret closed semaphore as shutdown + .map_err(|_| NotifyError::ShuttingDown)?; + backoff::retry( - || self.do_notify_iteration(&client, url, &reconfigure_request, cancel), + || self.do_notify_iteration(&client, url, reconfigure_request, cancel), |e| { matches!( e, @@ -343,42 +434,70 @@ impl ComputeHook { stripe_size: ShardStripeSize, cancel: &CancellationToken, ) -> Result<(), NotifyError> { - let mut locked = self.state.lock().await; + let maybe_send_result = { + let mut state_locked = self.state.lock().unwrap(); - use std::collections::hash_map::Entry; - let tenant = match locked.entry(tenant_shard_id.tenant_id) { - Entry::Vacant(e) => e.insert(ComputeHookTenant::new( - tenant_shard_id, - stripe_size, - node_id, - )), - Entry::Occupied(e) => { - let tenant = e.into_mut(); - tenant.update(tenant_shard_id, stripe_size, node_id); - tenant + use std::collections::hash_map::Entry; + let tenant = match state_locked.entry(tenant_shard_id.tenant_id) { + Entry::Vacant(e) => e.insert(ComputeHookTenant::new( + tenant_shard_id, + stripe_size, + node_id, + )), + Entry::Occupied(e) => { + let tenant = e.into_mut(); + tenant.update(tenant_shard_id, stripe_size, node_id); + tenant + } + }; + tenant.maybe_send(tenant_shard_id.tenant_id, None) + }; + + // Process result: we may get an update to send, or we may have to wait for a lock + // before trying again. + let (request, mut send_lock_guard) = match maybe_send_result { + MaybeSendResult::Noop => { + return Ok(()); } + MaybeSendResult::AwaitLock(send_lock) => { + let send_locked = send_lock.lock_owned().await; + + // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here + // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses + // try_lock. + let state_locked = self.state.lock().unwrap(); + let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else { + return Ok(()); + }; + match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) { + MaybeSendResult::AwaitLock(_) => { + unreachable!("We supplied lock guard") + } + MaybeSendResult::Noop => { + return Ok(()); + } + MaybeSendResult::Transmit((request, lock)) => (request, lock), + } + } + MaybeSendResult::Transmit((request, lock)) => (request, lock), }; - let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id); - let Some(reconfigure_request) = reconfigure_request else { - // The tenant doesn't yet have pageservers for all its shards: we won't notify anything - // until it does. - tracing::info!("Tenant isn't yet ready to emit a notification"); - return Ok(()); - }; - - if let Some(notify_url) = &self.config.compute_hook_url { - self.do_notify(notify_url, reconfigure_request, cancel) - .await + let result = if let Some(notify_url) = &self.config.compute_hook_url { + self.do_notify(notify_url, &request, cancel).await } else { - self.do_notify_local(reconfigure_request) - .await - .map_err(|e| { - // This path is for testing only, so munge the error into our prod-style error type. - tracing::error!("Local notification hook failed: {e}"); - NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR) - }) + self.do_notify_local(&request).await.map_err(|e| { + // This path is for testing only, so munge the error into our prod-style error type. + tracing::error!("Local notification hook failed: {e}"); + NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR) + }) + }; + + if result.is_ok() { + // Before dropping the send lock, stash the request we just sent so that + // subsequent callers can avoid redundantly re-sending the same thing. + *send_lock_guard = Some(request); } + result } } @@ -402,21 +521,22 @@ pub(crate) mod tests { NodeId(1), ); - // An unsharded tenant is always ready to emit a notification - assert!(tenant_state.maybe_reconfigure(tenant_id).is_some()); - assert_eq!( - tenant_state - .maybe_reconfigure(tenant_id) - .unwrap() - .shards - .len(), - 1 - ); - assert!(tenant_state - .maybe_reconfigure(tenant_id) - .unwrap() - .stripe_size - .is_none()); + // An unsharded tenant is always ready to emit a notification, but won't + // send the same one twice + let send_result = tenant_state.maybe_send(tenant_id, None); + let MaybeSendResult::Transmit((request, mut guard)) = send_result else { + anyhow::bail!("Wrong send result"); + }; + assert_eq!(request.shards.len(), 1); + assert!(request.stripe_size.is_none()); + + // Simulate successful send + *guard = Some(request); + drop(guard); + + // Try asking again: this should be a no-op + let send_result = tenant_state.maybe_send(tenant_id, None); + assert!(matches!(send_result, MaybeSendResult::Noop)); // Writing the first shard of a multi-sharded situation (i.e. in a split) // resets the tenant state and puts it in an non-notifying state (need to @@ -430,7 +550,10 @@ pub(crate) mod tests { ShardStripeSize(32768), NodeId(1), ); - assert!(tenant_state.maybe_reconfigure(tenant_id).is_none()); + assert!(matches!( + tenant_state.maybe_send(tenant_id, None), + MaybeSendResult::Noop + )); // Writing the second shard makes it ready to notify tenant_state.update( @@ -443,22 +566,16 @@ pub(crate) mod tests { NodeId(1), ); - assert!(tenant_state.maybe_reconfigure(tenant_id).is_some()); - assert_eq!( - tenant_state - .maybe_reconfigure(tenant_id) - .unwrap() - .shards - .len(), - 2 - ); - assert_eq!( - tenant_state - .maybe_reconfigure(tenant_id) - .unwrap() - .stripe_size, - Some(ShardStripeSize(32768)) - ); + let send_result = tenant_state.maybe_send(tenant_id, None); + let MaybeSendResult::Transmit((request, mut guard)) = send_result else { + anyhow::bail!("Wrong send result"); + }; + assert_eq!(request.shards.len(), 2); + assert_eq!(request.stripe_size, Some(ShardStripeSize(32768))); + + // Simulate successful send + *guard = Some(request); + drop(guard); Ok(()) }