use std::borrow::Cow; use std::collections::HashMap; use std::error::Error as _; use std::sync::Arc; use std::time::Duration; use anyhow::Context; use compute_api::spec::PageserverProtocol; use compute_api::spec::PageserverShardInfo; use control_plane::endpoint::{ ComputeControlPlane, EndpointStatus, PageserverConnectionInfo, PageserverShardConnectionInfo, }; use control_plane::local_env::LocalEnv; use futures::StreamExt; use hyper::StatusCode; use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT; use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId}; use postgres_connection::parse_host_port; use safekeeper_api::membership::SafekeeperGeneration; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, info_span}; use utils::backoff::{self}; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use crate::service::Config; const SLOWDOWN_DELAY: Duration = Duration::from_secs(5); const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); pub(crate) const API_CONCURRENCY: usize = 32; struct UnshardedComputeHookTenant { // Which node is this tenant attached to node_id: NodeId, // The tenant's preferred AZ, so that we may pass this on to the control plane preferred_az: Option, // Must hold this lock to send a notification. send_lock: Arc>>, } struct ShardedComputeHookTenant { stripe_size: ShardStripeSize, shard_count: ShardCount, shards: Vec<(ShardNumber, NodeId)>, // The tenant's preferred AZ, so that we may pass this on to the control plane preferred_az: Option, // Must hold this lock to send a notification. The contents represent // the last successfully sent notification, and are used to coalesce multiple // updates by only sending when there is a chance since our last successful send. send_lock: Arc>>, } /// Represents our knowledge of the compute's state: we can update this when we get a /// response from a notify API call, which tells us what has been applied. /// /// Should be wrapped in an Option<>, as we cannot always know the remote state. #[derive(PartialEq, Eq, Debug)] struct ComputeRemoteState { // The request body which was acked by the compute request: R, // Whether the cplane indicated that the state was applied to running computes, or just // persisted. In the Neon control plane, this is the difference between a 423 response (meaning // persisted but not applied), and a 2xx response (both persisted and applied) applied: bool, } type ComputeRemoteTenantState = ComputeRemoteState; type ComputeRemoteTimelineState = ComputeRemoteState; /// The trait which define the handler-specific types and methods. /// We have two implementations of this trait so far: /// - [`ComputeHookTenant`] for tenant attach notifications ("/notify-attach") /// - [`ComputeHookTimeline`] for safekeeper change notifications ("/notify-safekeepers") trait ApiMethod { /// Type of the key which identifies the resource. /// It's either TenantId for tenant attach notifications, /// or TenantTimelineId for safekeeper change notifications. type Key: std::cmp::Eq + std::hash::Hash + Clone; type Request: serde::Serialize + std::fmt::Debug; const API_PATH: &'static str; fn maybe_send( &self, key: Self::Key, lock: Option>>>, ) -> MaybeSendResult; async fn notify_local( env: &LocalEnv, cplane: &ComputeControlPlane, req: &Self::Request, ) -> Result<(), NotifyError>; } enum ComputeHookTenant { Unsharded(UnshardedComputeHookTenant), Sharded(ShardedComputeHookTenant), } impl ComputeHookTenant { /// Construct with at least one shard's information fn new( tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, preferred_az: Option, node_id: NodeId, ) -> Self { if tenant_shard_id.shard_count.count() > 1 { Self::Sharded(ShardedComputeHookTenant { shards: vec![(tenant_shard_id.shard_number, node_id)], stripe_size, shard_count: tenant_shard_id.shard_count, preferred_az, send_lock: Arc::default(), }) } else { Self::Unsharded(UnshardedComputeHookTenant { node_id, preferred_az, send_lock: Arc::default(), }) } } fn get_send_lock(&self) -> &Arc>> { match self { Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock, Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock, } } fn is_sharded(&self) -> bool { matches!(self, ComputeHookTenant::Sharded(_)) } /// Clear compute hook state for the specified shard. /// Only valid for [`ComputeHookTenant::Sharded`] instances. fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) { match self { ComputeHookTenant::Sharded(sharded) => { if sharded.stripe_size != stripe_size || sharded.shard_count != tenant_shard_id.shard_count { tracing::warn!("Shard split detected while handling detach") } let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| { *shard_number == tenant_shard_id.shard_number }); if let Some(shard_idx) = shard_idx { sharded.shards.remove(shard_idx); } else { // This is a valid but niche case, where the tenant was previously attached // as a Secondary location and then detached, so has no previously notified // state. tracing::info!("Shard not found while handling detach") } } ComputeHookTenant::Unsharded(_) => { unreachable!("Detach of unsharded tenants is handled externally"); } } } /// Set one shard's location. If stripe size or shard count have changed, Self is reset /// and drops existing content. fn update(&mut self, shard_update: ShardUpdate) { let tenant_shard_id = shard_update.tenant_shard_id; let node_id = shard_update.node_id; let stripe_size = shard_update.stripe_size; let preferred_az = shard_update.preferred_az; match self { Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => { unsharded_tenant.node_id = node_id; if unsharded_tenant.preferred_az.as_ref() != preferred_az.as_ref().map(|az| az.as_ref()) { unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone()); } } Self::Sharded(sharded_tenant) if sharded_tenant.stripe_size == stripe_size && sharded_tenant.shard_count == tenant_shard_id.shard_count => { if let Some(existing) = sharded_tenant .shards .iter() .position(|s| s.0 == tenant_shard_id.shard_number) { sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id; } else { sharded_tenant .shards .push((tenant_shard_id.shard_number, node_id)); sharded_tenant.shards.sort_by_key(|s| s.0) } if sharded_tenant.preferred_az.as_ref() != preferred_az.as_ref().map(|az| az.as_ref()) { sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone()); } } _ => { // Shard count changed: reset struct. *self = Self::new( tenant_shard_id, stripe_size, preferred_az.map(|az| az.into_owned()), node_id, ); } } } } /// The state of a timeline we need to notify the compute about. struct ComputeHookTimeline { generation: SafekeeperGeneration, safekeepers: Vec, send_lock: Arc>>, } impl ComputeHookTimeline { /// Construct a new ComputeHookTimeline with the given safekeepers and generation. fn new(generation: SafekeeperGeneration, safekeepers: Vec) -> Self { Self { generation, safekeepers, send_lock: Arc::default(), } } /// Update the state with a new SafekeepersUpdate. /// Noop if the update generation is not greater than the current generation. fn update(&mut self, sk_update: SafekeepersUpdate) { if sk_update.generation > self.generation { self.generation = sk_update.generation; self.safekeepers = sk_update.safekeepers; } } } impl ApiMethod for ComputeHookTimeline { type Key = TenantTimelineId; type Request = NotifySafekeepersRequest; const API_PATH: &'static str = "notify-safekeepers"; fn maybe_send( &self, ttid: TenantTimelineId, lock: Option>>, ) -> MaybeSendNotifySafekeepersResult { let locked = match lock { Some(already_locked) => already_locked, None => { // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::timelines`] lock. let Ok(locked) = self.send_lock.clone().try_lock_owned() else { return MaybeSendResult::AwaitLock((ttid, self.send_lock.clone())); }; locked } }; if locked .as_ref() .is_some_and(|s| s.request.generation >= self.generation) { return MaybeSendResult::Noop; } MaybeSendResult::Transmit(( NotifySafekeepersRequest { tenant_id: ttid.tenant_id, timeline_id: ttid.timeline_id, generation: self.generation, safekeepers: self.safekeepers.clone(), }, locked, )) } async fn notify_local( _env: &LocalEnv, cplane: &ComputeControlPlane, req: &NotifySafekeepersRequest, ) -> Result<(), NotifyError> { let NotifySafekeepersRequest { tenant_id, timeline_id, generation, safekeepers, } = req; for (endpoint_name, endpoint) in &cplane.endpoints { if endpoint.tenant_id == *tenant_id && endpoint.timeline_id == *timeline_id && endpoint.status() == EndpointStatus::Running { tracing::info!("Reconfiguring safekeepers for endpoint {endpoint_name}"); let safekeepers = safekeepers.iter().map(|sk| sk.id).collect::>(); endpoint .reconfigure_safekeepers(safekeepers, *generation) .await .map_err(NotifyError::NeonLocal)?; } } Ok(()) } } #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] struct NotifyAttachRequestShard { node_id: NodeId, shard_number: ShardNumber, } /// Request body that we send to the control plane to notify it of where a tenant is attached #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] struct NotifyAttachRequest { tenant_id: TenantId, preferred_az: Option, stripe_size: Option, shards: Vec, } #[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)] pub(crate) struct SafekeeperInfo { pub id: NodeId, /// Hostname of the safekeeper. /// It exists for better debuggability. Might be missing. /// Should not be used for anything else. pub hostname: Option, } #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] struct NotifySafekeepersRequest { tenant_id: TenantId, timeline_id: TimelineId, generation: SafekeeperGeneration, safekeepers: Vec, } /// Error type for attempts to call into the control plane compute notification hook #[derive(thiserror::Error, Debug)] pub(crate) enum NotifyError { // Request was not send successfully, e.g. transport error #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())] Request(#[from] reqwest::Error), // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon. #[error("Control plane tenant busy")] Busy, // Explicit 429 response asking us to retry less frequently #[error("Control plane overloaded")] SlowDown, // A 503 response indicates the control plane can't handle the request right now #[error("Control plane unavailable (status {0})")] Unavailable(StatusCode), // API returned unexpected non-success status. We will retry, but log a warning. #[error("Control plane returned unexpected status {0}")] Unexpected(StatusCode), // We shutdown while sending #[error("Shutting down")] ShuttingDown, // A response indicates we will never succeed, such as 400 or 403 #[error("Non-retryable error {0}")] Fatal(StatusCode), #[error("neon_local error: {0}")] NeonLocal(anyhow::Error), } enum MaybeSendResult { // Please send this request while holding the lock, and if you succeed then write // the request into the lock. Transmit( ( R, tokio::sync::OwnedMutexGuard>>, ), ), // Something requires sending, but you must wait for a current sender then call again AwaitLock((K, Arc>>>)), // Nothing requires sending Noop, } type MaybeSendNotifyAttachResult = MaybeSendResult; type MaybeSendNotifySafekeepersResult = MaybeSendResult; impl ApiMethod for ComputeHookTenant { type Key = TenantId; type Request = NotifyAttachRequest; const API_PATH: &'static str = "notify-attach"; fn maybe_send( &self, tenant_id: TenantId, lock: Option>>, ) -> MaybeSendNotifyAttachResult { let locked = match lock { Some(already_locked) => already_locked, None => { // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::tenants`] lock. let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else { return MaybeSendResult::AwaitLock((tenant_id, self.get_send_lock().clone())); }; locked } }; let request = match self { Self::Unsharded(unsharded_tenant) => Some(NotifyAttachRequest { tenant_id, shards: vec![NotifyAttachRequestShard { shard_number: ShardNumber(0), node_id: unsharded_tenant.node_id, }], stripe_size: None, preferred_az: unsharded_tenant .preferred_az .as_ref() .map(|az| az.0.clone()), }), Self::Sharded(sharded_tenant) if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize => { Some(NotifyAttachRequest { tenant_id, shards: sharded_tenant .shards .iter() .map(|(shard_number, node_id)| NotifyAttachRequestShard { shard_number: *shard_number, node_id: *node_id, }) .collect(), stripe_size: Some(sharded_tenant.stripe_size), preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()), }) } Self::Sharded(sharded_tenant) => { // Sharded tenant doesn't yet have information for all its shards tracing::info!( "ComputeHookTenant::maybe_send: not enough shards ({}/{})", sharded_tenant.shards.len(), sharded_tenant.shard_count.count() ); None } }; match request { None => { // Not yet ready to emit a notification tracing::info!("Tenant isn't yet ready to emit a notification"); MaybeSendResult::Noop } Some(request) if Some(&request) == locked.as_ref().map(|s| &s.request) && locked.as_ref().map(|s| s.applied).unwrap_or(false) => { tracing::info!( "Skipping notification because remote state already matches ({:?})", &request ); // No change from the last value successfully sent, and our state indicates that the last // value sent was fully applied on the control plane side. MaybeSendResult::Noop } Some(request) => { // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side MaybeSendResult::Transmit((request, locked)) } } } async fn notify_local( env: &LocalEnv, cplane: &ComputeControlPlane, req: &NotifyAttachRequest, ) -> Result<(), NotifyError> { let NotifyAttachRequest { tenant_id, shards, stripe_size, preferred_az: _preferred_az, } = req; for (endpoint_name, endpoint) in &cplane.endpoints { if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running { tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}"); let shard_count = match shards.len() { 1 => ShardCount::unsharded(), n => ShardCount(n.try_into().expect("too many shards")), }; let mut shard_infos: HashMap = HashMap::new(); let prefer_protocol = if endpoint.grpc { PageserverProtocol::Grpc } else { PageserverProtocol::Libpq }; for shard in shards.iter() { let ps_conf = env .get_pageserver_conf(shard.node_id) .expect("Unknown pageserver"); let libpq_url = Some({ let (host, port) = parse_host_port(&ps_conf.listen_pg_addr) .expect("Unable to parse listen_pg_addr"); let port = port.unwrap_or(5432); format!("postgres://no_user@{host}:{port}") }); let grpc_url = if let Some(grpc_addr) = &ps_conf.listen_grpc_addr { let (host, port) = parse_host_port(grpc_addr).expect("invalid gRPC address"); let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT); Some(format!("grpc://no_user@{host}:{port}")) } else { None }; let pageserver = PageserverShardConnectionInfo { id: Some(shard.node_id), libpq_url, grpc_url, }; let shard_info = PageserverShardInfo { pageservers: vec![pageserver], }; shard_infos.insert( ShardIndex { shard_number: shard.shard_number, shard_count, }, shard_info, ); } let pageserver_conninfo = PageserverConnectionInfo { shard_count, stripe_size: stripe_size.map(|val| ShardStripeSize(val.0)), shards: shard_infos, prefer_protocol, }; endpoint .reconfigure_pageservers(&pageserver_conninfo) .await .map_err(NotifyError::NeonLocal)?; } } Ok(()) } } /// The compute hook is a destination for notifications about changes to tenant:pageserver /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures /// the compute connection string. pub(super) struct ComputeHook { config: Config, tenants: std::sync::Mutex>, timelines: std::sync::Mutex>, authorization_header: Option, // Concurrency limiter, so that we do not overload the cloud control plane when updating // large numbers of tenants (e.g. when failing over after a node failure) api_concurrency: tokio::sync::Semaphore, // This lock is only used in testing enviroments, to serialize calls into neon_local neon_local_lock: tokio::sync::Mutex<()>, // We share a client across all notifications to enable connection re-use etc when // sending large numbers of notifications client: reqwest::Client, } /// Callers may give us a list of these when asking us to send a bulk batch /// of notifications in the background. This is a 'notification' in the sense of /// other code notifying us of a shard's status, rather than being the final notification /// that we send upwards to the control plane for the whole tenant. pub(crate) struct ShardUpdate<'a> { pub(crate) tenant_shard_id: TenantShardId, pub(crate) node_id: NodeId, pub(crate) stripe_size: ShardStripeSize, pub(crate) preferred_az: Option>, } pub(crate) struct SafekeepersUpdate { pub(crate) tenant_id: TenantId, pub(crate) timeline_id: TimelineId, pub(crate) generation: SafekeeperGeneration, pub(crate) safekeepers: Vec, } impl ComputeHook { pub(super) fn new(config: Config) -> anyhow::Result { let authorization_header = config .control_plane_jwt_token .clone() .map(|jwt| format!("Bearer {jwt}")); let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT); for cert in &config.ssl_ca_certs { client = client.add_root_certificate(cert.clone()); } let client = client .build() .context("Failed to build http client for compute hook")?; Ok(Self { tenants: Default::default(), timelines: Default::default(), config, authorization_header, neon_local_lock: Default::default(), api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY), client, }) } /// For test environments: use neon_local's LocalEnv to update compute async fn do_notify_local(&self, req: &M::Request) -> Result<(), NotifyError> { // neon_local updates are not safe to call concurrently, use a lock to serialize // all calls to this function let _locked = self.neon_local_lock.lock().await; let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else { tracing::warn!( "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update" ); return Ok(()); }; let env = match LocalEnv::load_config(repo_dir) { Ok(e) => e, Err(e) => { tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})"); return Ok(()); } }; let cplane = ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane"); M::notify_local(&env, &cplane, req).await } async fn do_notify_iteration( &self, url: &String, reconfigure_request: &Req, cancel: &CancellationToken, ) -> Result<(), NotifyError> { let req = self.client.request(reqwest::Method::PUT, url); let req = if let Some(value) = &self.authorization_header { req.header(reqwest::header::AUTHORIZATION, value) } else { req }; tracing::info!( "Sending notify request to {} ({:?})", url, reconfigure_request ); let send_result = req.json(&reconfigure_request).send().await; let response = match send_result { Ok(r) => r, Err(e) => return Err(e.into()), }; // Treat all 2xx responses as success if response.status().is_success() { if response.status() != reqwest::StatusCode::OK { // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so // log a warning. tracing::warn!( "Unexpected 2xx response code {} from control plane", response.status() ); } return Ok(()); } // Error response codes match response.status() { reqwest::StatusCode::TOO_MANY_REQUESTS => { // TODO: 429 handling should be global: set some state visible to other requests // so that they will delay before starting, rather than all notifications trying // once before backing off. tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled()) .await .ok(); Err(NotifyError::SlowDown) } reqwest::StatusCode::LOCKED => { // We consider this fatal, because it's possible that the operation blocking the control one is // also the one that is waiting for this reconcile. We should let the reconciler calling // this hook fail, to give control plane a chance to un-lock. tracing::info!("Control plane reports tenant is locked, dropping out of notify"); Err(NotifyError::Busy) } reqwest::StatusCode::SERVICE_UNAVAILABLE => { Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE)) } reqwest::StatusCode::GATEWAY_TIMEOUT => { Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT)) } reqwest::StatusCode::BAD_GATEWAY => { Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY)) } reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)), reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)), reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)), status => Err(NotifyError::Unexpected( hyper::StatusCode::from_u16(status.as_u16()) .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), )), } } async fn do_notify( &self, url: &String, reconfigure_request: &R, cancel: &CancellationToken, ) -> Result<(), NotifyError> { // We hold these semaphore units across all retries, rather than only across each // HTTP request: this is to preserve fairness and avoid a situation where a retry might // time out waiting for a semaphore. let _units = self .api_concurrency .acquire() .await // Interpret closed semaphore as shutdown .map_err(|_| NotifyError::ShuttingDown)?; backoff::retry( || self.do_notify_iteration(url, reconfigure_request, cancel), |e| { matches!( e, NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy ) }, 3, 10, "Send compute notification", cancel, ) .await .ok_or_else(|| NotifyError::ShuttingDown) .and_then(|x| x) } /// Synchronous phase: update the per-tenant state for the next intended notification fn notify_attach_prepare(&self, shard_update: ShardUpdate) -> MaybeSendNotifyAttachResult { let mut tenants_locked = self.tenants.lock().unwrap(); use std::collections::hash_map::Entry; let tenant_shard_id = shard_update.tenant_shard_id; let tenant = match tenants_locked.entry(tenant_shard_id.tenant_id) { Entry::Vacant(e) => { let ShardUpdate { tenant_shard_id, node_id, stripe_size, preferred_az, } = shard_update; e.insert(ComputeHookTenant::new( tenant_shard_id, stripe_size, preferred_az.map(|az| az.into_owned()), node_id, )) } Entry::Occupied(e) => { let tenant = e.into_mut(); tenant.update(shard_update); tenant } }; tenant.maybe_send(tenant_shard_id.tenant_id, None) } fn notify_safekeepers_prepare( &self, safekeepers_update: SafekeepersUpdate, ) -> MaybeSendNotifySafekeepersResult { let mut timelines_locked = self.timelines.lock().unwrap(); let ttid = TenantTimelineId { tenant_id: safekeepers_update.tenant_id, timeline_id: safekeepers_update.timeline_id, }; use std::collections::hash_map::Entry; let timeline = match timelines_locked.entry(ttid) { Entry::Vacant(e) => e.insert(ComputeHookTimeline::new( safekeepers_update.generation, safekeepers_update.safekeepers, )), Entry::Occupied(e) => { let timeline = e.into_mut(); timeline.update(safekeepers_update); timeline } }; timeline.maybe_send(ttid, None) } async fn notify_execute( &self, state: &std::sync::Mutex>, maybe_send_result: MaybeSendResult, cancel: &CancellationToken, ) -> Result<(), NotifyError> { // Process result: we may get an update to send, or we may have to wait for a lock // before trying again. let (request, mut send_lock_guard) = match maybe_send_result { MaybeSendResult::Noop => { return Ok(()); } MaybeSendResult::AwaitLock((key, send_lock)) => { let send_locked = tokio::select! { guard = send_lock.lock_owned() => {guard}, _ = cancel.cancelled() => { tracing::info!("Notification cancelled while waiting for lock"); return Err(NotifyError::ShuttingDown) } }; // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses // try_lock. let state_locked = state.lock().unwrap(); let Some(resource_state) = state_locked.get(&key) else { return Ok(()); }; match resource_state.maybe_send(key, Some(send_locked)) { MaybeSendResult::AwaitLock(_) => { unreachable!("We supplied lock guard") } MaybeSendResult::Noop => { return Ok(()); } MaybeSendResult::Transmit((request, lock)) => (request, lock), } } MaybeSendResult::Transmit((request, lock)) => (request, lock), }; let result = if !self.config.use_local_compute_notifications { let compute_hook_url = self.config .control_plane_url .as_ref() .map(|control_plane_url| { format!( "{}/{}", control_plane_url.trim_end_matches('/'), M::API_PATH ) }); // We validate this at startup let notify_url = compute_hook_url.as_ref().unwrap(); self.do_notify(notify_url, &request, cancel).await } else { match self.do_notify_local::(&request).await.map_err(|e| { // This path is for testing only, so munge the error into our prod-style error type. if e.to_string().contains("refresh-configuration-pending") { // If the error message mentions "refresh-configuration-pending", it means the compute node // rejected our notification request because it already trying to reconfigure itself. We // can proceed with the rest of the reconcliation process as the compute node already // discovers the need to reconfigure and will eventually update its configuration once // we update the pageserver mappings. In fact, it is important that we continue with // reconcliation to make sure we update the pageserver mappings to unblock the compute node. tracing::info!("neon_local notification hook failed: {e}"); tracing::info!("Notification failed likely due to compute node self-reconfiguration, will retry."); Ok(()) } else { tracing::error!("neon_local notification hook failed: {e}"); Err(NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)) } }) { // Compute node accepted the notification request. Ok to proceed. Ok(_) => Ok(()), // Compute node rejected our request but it is already self-reconfiguring. Ok to proceed. Err(Ok(_)) => Ok(()), // Fail the reconciliation attempt in all other cases. Recall that this whole code path involving // neon_local is for testing only. In production we always retry failed reconcliations so we // don't have any deadends here. Err(Err(e)) => Err(e), } }; match result { Ok(_) => { // Before dropping the send lock, stash the request we just sent so that // subsequent callers can avoid redundantly re-sending the same thing. *send_lock_guard = Some(ComputeRemoteState { request, applied: true, }); } Err(NotifyError::Busy) => { // Busy result means that the server responded and has stored the new configuration, // but was not able to fully apply it to the compute *send_lock_guard = Some(ComputeRemoteState { request, applied: false, }); } Err(_) => { // General error case: we can no longer know the remote state, so clear it. This will result in // the logic in maybe_send recognizing that we should call the hook again. *send_lock_guard = None; } } result } /// Infallible synchronous fire-and-forget version of notify(), that sends its results to /// a channel. Something should consume the channel and arrange to try notifying again /// if something failed. pub(super) fn notify_attach_background( self: &Arc, notifications: Vec, result_tx: tokio::sync::mpsc::Sender>, cancel: &CancellationToken, ) { let mut maybe_sends = Vec::new(); for shard_update in notifications { let tenant_shard_id = shard_update.tenant_shard_id; let maybe_send_result = self.notify_attach_prepare(shard_update); maybe_sends.push((tenant_shard_id, maybe_send_result)) } let this = self.clone(); let cancel = cancel.clone(); tokio::task::spawn(async move { // Construct an async stream of futures to invoke the compute notify function: we do this // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which // would mostly just be waiting on that semaphore. let mut stream = futures::stream::iter(maybe_sends) .map(|(tenant_shard_id, maybe_send_result)| { let this = this.clone(); let cancel = cancel.clone(); async move { this .notify_execute(&this.tenants, maybe_send_result, &cancel) .await.map_err(|e| (tenant_shard_id, e)) }.instrument(info_span!( "notify_attach_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug() )) }) .buffered(API_CONCURRENCY); loop { tokio::select! { next = stream.next() => { match next { Some(r) => { result_tx.send(r).await.ok(); }, None => { tracing::info!("Finished sending background compute notifications"); break; } } }, _ = cancel.cancelled() => { tracing::info!("Shutdown while running background compute notifications"); break; } }; } }); } /// Call this to notify the compute (postgres) tier of new pageservers to use /// for a tenant. notify() is called by each shard individually, and this function /// will decide whether an update to the tenant is sent. An update is sent on the /// condition that: /// - We know a pageserver for every shard. /// - All the shards have the same shard_count (i.e. we are not mid-split) /// /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler /// that is cancelled. /// /// This function is fallible, including in the case that the control plane is transiently /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability /// periods, but we don't retry forever. The **caller** is responsible for handling failures and /// ensuring that they eventually call again to ensure that the compute is eventually notified of /// the proper pageserver nodes for a tenant. #[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))] pub(super) async fn notify_attach<'a>( &self, shard_update: ShardUpdate<'a>, cancel: &CancellationToken, ) -> Result<(), NotifyError> { let maybe_send_result = self.notify_attach_prepare(shard_update); self.notify_execute(&self.tenants, maybe_send_result, cancel) .await } pub(super) async fn notify_safekeepers( &self, safekeepers_update: SafekeepersUpdate, cancel: &CancellationToken, ) -> Result<(), NotifyError> { let maybe_send_result = self.notify_safekeepers_prepare(safekeepers_update); self.notify_execute(&self.timelines, maybe_send_result, cancel) .await } /// Reflect a detach for a particular shard in the compute hook state. /// /// The goal is to avoid sending compute notifications with stale information (i.e. /// including detach pageservers). #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(super) fn handle_detach( &self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, ) { use std::collections::hash_map::Entry; let mut tenants_locked = self.tenants.lock().unwrap(); match tenants_locked.entry(tenant_shard_id.tenant_id) { Entry::Vacant(_) => { // This is a valid but niche case, where the tenant was previously attached // as a Secondary location and then detached, so has no previously notified // state. tracing::info!("Compute hook tenant not found for detach"); } Entry::Occupied(mut e) => { let sharded = e.get().is_sharded(); if !sharded { e.remove(); } else { e.get_mut().remove_shard(tenant_shard_id, stripe_size); } tracing::debug!("Compute hook handled shard detach"); } } } } #[cfg(test)] pub(crate) mod tests { use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber}; use utils::id::TenantId; use super::*; #[test] fn tenant_updates() -> anyhow::Result<()> { let tenant_id = TenantId::generate(); let stripe_size = DEFAULT_STRIPE_SIZE; let mut tenant_state = ComputeHookTenant::new( TenantShardId { tenant_id, shard_count: ShardCount::new(0), shard_number: ShardNumber(0), }, ShardStripeSize(12345), None, NodeId(1), ); // An unsharded tenant is always ready to emit a notification, but won't // send the same one twice let send_result = tenant_state.maybe_send(tenant_id, None); let MaybeSendResult::Transmit((request, mut guard)) = send_result else { anyhow::bail!("Wrong send result"); }; assert_eq!(request.shards.len(), 1); assert!(request.stripe_size.is_none()); // Simulate successful send *guard = Some(ComputeRemoteState { request, applied: true, }); drop(guard); // Try asking again: this should be a no-op let send_result = tenant_state.maybe_send(tenant_id, None); assert!(matches!(send_result, MaybeSendResult::Noop)); // Writing the first shard of a multi-sharded situation (i.e. in a split) // resets the tenant state and puts it in an non-notifying state (need to // see all shards) tenant_state.update(ShardUpdate { tenant_shard_id: TenantShardId { tenant_id, shard_count: ShardCount::new(2), shard_number: ShardNumber(1), }, stripe_size, preferred_az: None, node_id: NodeId(1), }); assert!(matches!( tenant_state.maybe_send(tenant_id, None), MaybeSendResult::Noop )); // Writing the second shard makes it ready to notify tenant_state.update(ShardUpdate { tenant_shard_id: TenantShardId { tenant_id, shard_count: ShardCount::new(2), shard_number: ShardNumber(0), }, stripe_size, preferred_az: None, node_id: NodeId(1), }); let send_result = tenant_state.maybe_send(tenant_id, None); let MaybeSendResult::Transmit((request, mut guard)) = send_result else { anyhow::bail!("Wrong send result"); }; assert_eq!(request.shards.len(), 2); assert_eq!(request.stripe_size, Some(stripe_size)); // Simulate successful send *guard = Some(ComputeRemoteState { request, applied: true, }); drop(guard); Ok(()) } }