mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
proxy: Refactor cplane types (#9643)
The overall idea of the PR is to rename a few types to make their purpose more clear, reduce abstraction where not needed, and move types to to more better suited modules.
This commit is contained in:
@@ -9,8 +9,7 @@ use super::ComputeCredentialKeys;
|
||||
use crate::cache::Cached;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::provider::NodeInfo;
|
||||
use crate::control_plane::{self, CachedNodeInfo};
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::proxy::connect_compute::ComputeConnectBackend;
|
||||
use crate::stream::PqStream;
|
||||
|
||||
@@ -21,11 +21,11 @@ use crate::auth::{self, validate_password_and_exchange, AuthError, ComputeUserIn
|
||||
use crate::cache::Cached;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::client::ControlPlaneClient;
|
||||
use crate::control_plane::errors::GetAuthInfoError;
|
||||
use crate::control_plane::provider::{
|
||||
CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneBackend,
|
||||
use crate::control_plane::{
|
||||
self, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneApi,
|
||||
};
|
||||
use crate::control_plane::{self, Api, AuthSecret};
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::proxy::connect_compute::ComputeConnectBackend;
|
||||
@@ -62,42 +62,26 @@ impl<T> std::ops::Deref for MaybeOwned<'_, T> {
|
||||
/// backends which require them for the authentication process.
|
||||
pub enum Backend<'a, T> {
|
||||
/// Cloud API (V2).
|
||||
ControlPlane(MaybeOwned<'a, ControlPlaneBackend>, T),
|
||||
ControlPlane(MaybeOwned<'a, ControlPlaneClient>, T),
|
||||
/// Local proxy uses configured auth credentials and does not wake compute
|
||||
Local(MaybeOwned<'a, LocalBackend>),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) trait TestBackend: Send + Sync + 'static {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError>;
|
||||
fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), control_plane::errors::GetAuthInfoError>;
|
||||
fn dyn_clone(&self) -> Box<dyn TestBackend>;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl Clone for Box<dyn TestBackend> {
|
||||
fn clone(&self) -> Self {
|
||||
TestBackend::dyn_clone(&**self)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Backend<'_, ()> {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::ControlPlane(api, ()) => match &**api {
|
||||
ControlPlaneBackend::Management(endpoint) => fmt
|
||||
.debug_tuple("ControlPlane::Management")
|
||||
ControlPlaneClient::Neon(endpoint) => fmt
|
||||
.debug_tuple("ControlPlane::Neon")
|
||||
.field(&endpoint.url())
|
||||
.finish(),
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ControlPlaneBackend::PostgresMock(endpoint) => fmt
|
||||
ControlPlaneClient::PostgresMock(endpoint) => fmt
|
||||
.debug_tuple("ControlPlane::PostgresMock")
|
||||
.field(&endpoint.url())
|
||||
.finish(),
|
||||
#[cfg(test)]
|
||||
ControlPlaneBackend::Test(_) => fmt.debug_tuple("ControlPlane::Test").finish(),
|
||||
ControlPlaneClient::Test(_) => fmt.debug_tuple("ControlPlane::Test").finish(),
|
||||
},
|
||||
Self::Local(_) => fmt.debug_tuple("Local").finish(),
|
||||
}
|
||||
@@ -282,7 +266,7 @@ impl AuthenticationConfig {
|
||||
/// All authentication flows will emit an AuthenticationOk message if successful.
|
||||
async fn auth_quirks(
|
||||
ctx: &RequestMonitoring,
|
||||
api: &impl control_plane::Api,
|
||||
api: &impl control_plane::ControlPlaneApi,
|
||||
user_info: ComputeUserInfoMaybeEndpoint,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
allow_cleartext: bool,
|
||||
@@ -499,12 +483,12 @@ mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use control_plane::AuthSecret;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_protocol::authentication::sasl::{ChannelBinding, ScramSha256};
|
||||
use postgres_protocol::message::backend::Message as PgMessage;
|
||||
use postgres_protocol::message::frontend;
|
||||
use provider::AuthSecret;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use super::jwt::JwkCache;
|
||||
@@ -513,8 +497,7 @@ mod tests {
|
||||
use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern};
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::provider::{self, CachedAllowedIps, CachedRoleSecret};
|
||||
use crate::control_plane::{self, CachedNodeInfo};
|
||||
use crate::control_plane::{self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret};
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo};
|
||||
use crate::scram::threadpool::ThreadPool;
|
||||
@@ -526,7 +509,7 @@ mod tests {
|
||||
secret: AuthSecret,
|
||||
}
|
||||
|
||||
impl control_plane::Api for Auth {
|
||||
impl control_plane::ControlPlaneApi for Auth {
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
_ctx: &RequestMonitoring,
|
||||
|
||||
@@ -513,7 +513,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
if let Either::Left(auth::Backend::ControlPlane(api, _)) = &auth_backend {
|
||||
if let proxy::control_plane::provider::ControlPlaneBackend::Management(api) = &**api {
|
||||
if let proxy::control_plane::client::ControlPlaneClient::Neon(api) = &**api {
|
||||
match (redis_notifications_client, regional_redis_client.clone()) {
|
||||
(None, None) => {}
|
||||
(client1, client2) => {
|
||||
@@ -732,13 +732,13 @@ fn build_auth_backend(
|
||||
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
|
||||
let wake_compute_endpoint_rate_limiter =
|
||||
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
|
||||
let api = control_plane::provider::neon::Api::new(
|
||||
let api = control_plane::client::neon::NeonControlPlaneClient::new(
|
||||
endpoint,
|
||||
caches,
|
||||
locks,
|
||||
wake_compute_endpoint_rate_limiter,
|
||||
);
|
||||
let api = control_plane::provider::ControlPlaneBackend::Management(api);
|
||||
let api = control_plane::client::ControlPlaneClient::Neon(api);
|
||||
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
|
||||
|
||||
let config = Box::leak(Box::new(auth_backend));
|
||||
@@ -749,8 +749,11 @@ fn build_auth_backend(
|
||||
#[cfg(feature = "testing")]
|
||||
AuthBackendType::Postgres => {
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
let api = control_plane::provider::mock::Api::new(url, !args.is_private_access_proxy);
|
||||
let api = control_plane::provider::ControlPlaneBackend::PostgresMock(api);
|
||||
let api = control_plane::client::mock::MockControlPlane::new(
|
||||
url,
|
||||
!args.is_private_access_proxy,
|
||||
);
|
||||
let api = control_plane::client::ControlPlaneClient::PostgresMock(api);
|
||||
|
||||
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
|
||||
|
||||
|
||||
@@ -19,9 +19,9 @@ use tracing::{error, info, warn};
|
||||
use crate::auth::parse_endpoint_param;
|
||||
use crate::cancellation::CancelClosure;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::client::ApiLockError;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::control_plane::provider::ApiLockError;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, NumDbConnectionsGuard};
|
||||
use crate::proxy::neon_option;
|
||||
|
||||
@@ -366,7 +366,7 @@ pub struct EndpointCacheConfig {
|
||||
}
|
||||
|
||||
impl EndpointCacheConfig {
|
||||
/// Default options for [`crate::control_plane::provider::NodeInfoCache`].
|
||||
/// Default options for [`crate::control_plane::NodeInfoCache`].
|
||||
/// Notice that by default the limiter is empty, which means that cache is disabled.
|
||||
pub const CACHE_DEFAULT_OPTIONS: &'static str =
|
||||
"initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s";
|
||||
@@ -441,7 +441,7 @@ pub struct CacheOptions {
|
||||
}
|
||||
|
||||
impl CacheOptions {
|
||||
/// Default options for [`crate::control_plane::provider::NodeInfoCache`].
|
||||
/// Default options for [`crate::control_plane::NodeInfoCache`].
|
||||
pub const CACHE_DEFAULT_OPTIONS: &'static str = "size=4000,ttl=4m";
|
||||
|
||||
/// Parse cache options passed via cmdline.
|
||||
@@ -497,7 +497,7 @@ pub struct ProjectInfoCacheOptions {
|
||||
}
|
||||
|
||||
impl ProjectInfoCacheOptions {
|
||||
/// Default options for [`crate::control_plane::provider::NodeInfoCache`].
|
||||
/// Default options for [`crate::control_plane::NodeInfoCache`].
|
||||
pub const CACHE_DEFAULT_OPTIONS: &'static str =
|
||||
"size=10000,ttl=4m,max_roles=10,gc_interval=60m";
|
||||
|
||||
@@ -616,9 +616,9 @@ pub struct ConcurrencyLockOptions {
|
||||
}
|
||||
|
||||
impl ConcurrencyLockOptions {
|
||||
/// Default options for [`crate::control_plane::provider::ApiLocks`].
|
||||
/// Default options for [`crate::control_plane::client::ApiLocks`].
|
||||
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
|
||||
/// Default options for [`crate::control_plane::provider::ApiLocks`].
|
||||
/// Default options for [`crate::control_plane::client::ApiLocks`].
|
||||
pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
|
||||
"shards=64,permits=100,epoch=10m,timeout=10ms";
|
||||
|
||||
|
||||
@@ -9,16 +9,17 @@ use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::Client;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
|
||||
use super::errors::{ApiError, GetAuthInfoError, WakeComputeError};
|
||||
use super::{AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo};
|
||||
use crate::auth::backend::jwt::AuthRule;
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::IpPattern;
|
||||
use crate::cache::Cached;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::errors::GetEndpointJwksError;
|
||||
use crate::control_plane::client::{CachedAllowedIps, CachedRoleSecret};
|
||||
use crate::control_plane::errors::{
|
||||
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
|
||||
};
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::control_plane::provider::{CachedAllowedIps, CachedRoleSecret};
|
||||
use crate::control_plane::{AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::io_error;
|
||||
use crate::intern::RoleNameInt;
|
||||
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
|
||||
@@ -31,25 +32,25 @@ enum MockApiError {
|
||||
PasswordNotSet(tokio_postgres::Error),
|
||||
}
|
||||
|
||||
impl From<MockApiError> for ApiError {
|
||||
impl From<MockApiError> for ControlPlaneError {
|
||||
fn from(e: MockApiError) -> Self {
|
||||
io_error(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio_postgres::Error> for ApiError {
|
||||
impl From<tokio_postgres::Error> for ControlPlaneError {
|
||||
fn from(e: tokio_postgres::Error) -> Self {
|
||||
io_error(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Api {
|
||||
pub struct MockControlPlane {
|
||||
endpoint: ApiUrl,
|
||||
ip_allowlist_check_enabled: bool,
|
||||
}
|
||||
|
||||
impl Api {
|
||||
impl MockControlPlane {
|
||||
pub fn new(endpoint: ApiUrl, ip_allowlist_check_enabled: bool) -> Self {
|
||||
Self {
|
||||
endpoint,
|
||||
@@ -201,7 +202,7 @@ async fn get_execute_postgres_query(
|
||||
Ok(Some(entry))
|
||||
}
|
||||
|
||||
impl super::Api for Api {
|
||||
impl super::ControlPlaneApi for MockControlPlane {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
281
proxy/src/control_plane/client/mod.rs
Normal file
281
proxy/src/control_plane/client/mod.rs
Normal file
@@ -0,0 +1,281 @@
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod mock;
|
||||
pub mod neon;
|
||||
|
||||
use std::hash::Hash;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError};
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::cache::endpoints::EndpointsCache;
|
||||
use crate::cache::project_info::ProjectInfoCacheImpl;
|
||||
use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::{
|
||||
errors, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, NodeInfoCache,
|
||||
};
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::ApiLockMetrics;
|
||||
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
|
||||
use crate::types::EndpointId;
|
||||
|
||||
#[non_exhaustive]
|
||||
#[derive(Clone)]
|
||||
pub enum ControlPlaneClient {
|
||||
/// Current Management API (V2).
|
||||
Neon(neon::NeonControlPlaneClient),
|
||||
/// Local mock control plane.
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
PostgresMock(mock::MockControlPlane),
|
||||
/// Internal testing
|
||||
#[cfg(test)]
|
||||
#[allow(private_interfaces)]
|
||||
Test(Box<dyn TestControlPlaneClient>),
|
||||
}
|
||||
|
||||
impl ControlPlaneApi for ControlPlaneClient {
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
|
||||
match self {
|
||||
Self::Neon(api) => api.get_role_secret(ctx, user_info).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Self::PostgresMock(api) => api.get_role_secret(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(_) => {
|
||||
unreachable!("this function should never be called in the test backend")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
|
||||
match self {
|
||||
Self::Neon(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Self::PostgresMock(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(api) => api.get_allowed_ips_and_secret(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_endpoint_jwks(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
endpoint: EndpointId,
|
||||
) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError> {
|
||||
match self {
|
||||
Self::Neon(api) => api.get_endpoint_jwks(ctx, endpoint).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Self::PostgresMock(api) => api.get_endpoint_jwks(ctx, endpoint).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(_api) => Ok(vec![]),
|
||||
}
|
||||
}
|
||||
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, errors::WakeComputeError> {
|
||||
match self {
|
||||
Self::Neon(api) => api.wake_compute(ctx, user_info).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Self::PostgresMock(api) => api.wake_compute(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(api) => api.wake_compute(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) trait TestControlPlaneClient: Send + Sync + 'static {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, errors::WakeComputeError>;
|
||||
|
||||
fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError>;
|
||||
|
||||
fn dyn_clone(&self) -> Box<dyn TestControlPlaneClient>;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl Clone for Box<dyn TestControlPlaneClient> {
|
||||
fn clone(&self) -> Self {
|
||||
TestControlPlaneClient::dyn_clone(&**self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Various caches for [`control_plane`](super).
|
||||
pub struct ApiCaches {
|
||||
/// Cache for the `wake_compute` API method.
|
||||
pub(crate) node_info: NodeInfoCache,
|
||||
/// Cache which stores project_id -> endpoint_ids mapping.
|
||||
pub project_info: Arc<ProjectInfoCacheImpl>,
|
||||
/// List of all valid endpoints.
|
||||
pub endpoints_cache: Arc<EndpointsCache>,
|
||||
}
|
||||
|
||||
impl ApiCaches {
|
||||
pub fn new(
|
||||
wake_compute_cache_config: CacheOptions,
|
||||
project_info_cache_config: ProjectInfoCacheOptions,
|
||||
endpoint_cache_config: EndpointCacheConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
node_info: NodeInfoCache::new(
|
||||
"node_info_cache",
|
||||
wake_compute_cache_config.size,
|
||||
wake_compute_cache_config.ttl,
|
||||
true,
|
||||
),
|
||||
project_info: Arc::new(ProjectInfoCacheImpl::new(project_info_cache_config)),
|
||||
endpoints_cache: Arc::new(EndpointsCache::new(endpoint_cache_config)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Various caches for [`control_plane`](super).
|
||||
pub struct ApiLocks<K> {
|
||||
name: &'static str,
|
||||
node_locks: DashMap<K, Arc<DynamicLimiter>>,
|
||||
config: RateLimiterConfig,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum ApiLockError {
|
||||
#[error("timeout acquiring resource permit")]
|
||||
TimeoutError(#[from] tokio::time::error::Elapsed),
|
||||
}
|
||||
|
||||
impl ReportableError for ApiLockError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
config: RateLimiterConfig,
|
||||
shards: usize,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
) -> prometheus::Result<Self> {
|
||||
Ok(Self {
|
||||
name,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
config,
|
||||
timeout,
|
||||
epoch,
|
||||
metrics,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, ApiLockError> {
|
||||
if self.config.initial_limit == 0 {
|
||||
return Ok(WakeComputePermit {
|
||||
permit: Token::disabled(),
|
||||
});
|
||||
}
|
||||
let now = Instant::now();
|
||||
let semaphore = {
|
||||
// get fast path
|
||||
if let Some(semaphore) = self.node_locks.get(key) {
|
||||
semaphore.clone()
|
||||
} else {
|
||||
self.node_locks
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| {
|
||||
self.metrics.semaphores_registered.inc();
|
||||
DynamicLimiter::new(self.config)
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
};
|
||||
let permit = semaphore.acquire_timeout(self.timeout).await;
|
||||
|
||||
self.metrics
|
||||
.semaphore_acquire_seconds
|
||||
.observe(now.elapsed().as_secs_f64());
|
||||
info!("acquired permit {:?}", now.elapsed().as_secs_f64());
|
||||
Ok(WakeComputePermit { permit: permit? })
|
||||
}
|
||||
|
||||
pub async fn garbage_collect_worker(&self) {
|
||||
if self.config.initial_limit == 0 {
|
||||
return;
|
||||
}
|
||||
let mut interval =
|
||||
tokio::time::interval(self.epoch / (self.node_locks.shards().len()) as u32);
|
||||
loop {
|
||||
for (i, shard) in self.node_locks.shards().iter().enumerate() {
|
||||
interval.tick().await;
|
||||
// temporary lock a single shard and then clear any semaphores that aren't currently checked out
|
||||
// race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked
|
||||
// therefore releasing it is safe from race conditions
|
||||
info!(
|
||||
name = self.name,
|
||||
shard = i,
|
||||
"performing epoch reclamation on api lock"
|
||||
);
|
||||
let mut lock = shard.write();
|
||||
let timer = self.metrics.reclamation_lag_seconds.start_timer();
|
||||
let count = lock
|
||||
.extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1)
|
||||
.count();
|
||||
drop(lock);
|
||||
self.metrics.semaphores_unregistered.inc_by(count as u64);
|
||||
timer.observe();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct WakeComputePermit {
|
||||
permit: Token,
|
||||
}
|
||||
|
||||
impl WakeComputePermit {
|
||||
pub(crate) fn should_check_cache(&self) -> bool {
|
||||
!self.permit.is_disabled()
|
||||
}
|
||||
pub(crate) fn release(self, outcome: Outcome) {
|
||||
self.permit.release(outcome);
|
||||
}
|
||||
pub(crate) fn release_result<T, E>(self, res: Result<T, E>) -> Result<T, E> {
|
||||
match res {
|
||||
Ok(_) => self.release(Outcome::Success),
|
||||
Err(_) => self.release(Outcome::Overload),
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl FetchAuthRules for ControlPlaneClient {
|
||||
async fn fetch_auth_rules(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
endpoint: EndpointId,
|
||||
) -> Result<Vec<AuthRule>, FetchAuthRulesError> {
|
||||
self.get_endpoint_jwks(ctx, endpoint)
|
||||
.await
|
||||
.map_err(FetchAuthRulesError::GetEndpointJwks)
|
||||
}
|
||||
}
|
||||
@@ -10,18 +10,20 @@ use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use super::super::messages::{ControlPlaneError, GetRoleSecret, WakeCompute};
|
||||
use super::errors::{ApiError, GetAuthInfoError, WakeComputeError};
|
||||
use super::{
|
||||
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret,
|
||||
NodeInfo,
|
||||
};
|
||||
use super::super::messages::{ControlPlaneErrorMessage, GetRoleSecret, WakeCompute};
|
||||
use crate::auth::backend::jwt::AuthRule;
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::cache::Cached;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::errors::GetEndpointJwksError;
|
||||
use crate::control_plane::caches::ApiCaches;
|
||||
use crate::control_plane::errors::{
|
||||
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
|
||||
};
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
|
||||
use crate::control_plane::{
|
||||
AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo,
|
||||
};
|
||||
use crate::metrics::{CacheOutcome, Metrics};
|
||||
use crate::rate_limiter::WakeComputeRateLimiter;
|
||||
use crate::types::{EndpointCacheKey, EndpointId};
|
||||
@@ -30,7 +32,7 @@ use crate::{compute, http, scram};
|
||||
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Api {
|
||||
pub struct NeonControlPlaneClient {
|
||||
endpoint: http::Endpoint,
|
||||
pub caches: &'static ApiCaches,
|
||||
pub(crate) locks: &'static ApiLocks<EndpointCacheKey>,
|
||||
@@ -39,7 +41,7 @@ pub struct Api {
|
||||
jwt: Arc<str>,
|
||||
}
|
||||
|
||||
impl Api {
|
||||
impl NeonControlPlaneClient {
|
||||
/// Construct an API object containing the auth parameters.
|
||||
pub fn new(
|
||||
endpoint: http::Endpoint,
|
||||
@@ -256,7 +258,7 @@ impl Api {
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Api for Api {
|
||||
impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
@@ -356,7 +358,7 @@ impl super::Api for Api {
|
||||
let (cached, info) = cached.take_value();
|
||||
let info = info.map_err(|c| {
|
||||
info!(key = &*key, "found cached wake_compute error");
|
||||
WakeComputeError::ApiError(ApiError::ControlPlane(Box::new(*c)))
|
||||
WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c)))
|
||||
})?;
|
||||
|
||||
debug!(key = &*key, "found cached compute node info");
|
||||
@@ -403,9 +405,11 @@ impl super::Api for Api {
|
||||
Ok(cached.map(|()| node))
|
||||
}
|
||||
Err(err) => match err {
|
||||
WakeComputeError::ApiError(ApiError::ControlPlane(err)) => {
|
||||
WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => {
|
||||
let Some(status) = &err.status else {
|
||||
return Err(WakeComputeError::ApiError(ApiError::ControlPlane(err)));
|
||||
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
|
||||
err,
|
||||
)));
|
||||
};
|
||||
|
||||
let reason = status
|
||||
@@ -415,7 +419,9 @@ impl super::Api for Api {
|
||||
|
||||
// if we can retry this error, do not cache it.
|
||||
if reason.can_retry() {
|
||||
return Err(WakeComputeError::ApiError(ApiError::ControlPlane(err)));
|
||||
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
|
||||
err,
|
||||
)));
|
||||
}
|
||||
|
||||
// at this point, we should only have quota errors.
|
||||
@@ -430,7 +436,9 @@ impl super::Api for Api {
|
||||
Duration::from_secs(30),
|
||||
);
|
||||
|
||||
Err(WakeComputeError::ApiError(ApiError::ControlPlane(err)))
|
||||
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
|
||||
err,
|
||||
)))
|
||||
}
|
||||
err => return Err(err),
|
||||
},
|
||||
@@ -441,7 +449,7 @@ impl super::Api for Api {
|
||||
/// Parse http response body, taking status code into account.
|
||||
async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
response: http::Response,
|
||||
) -> Result<T, ApiError> {
|
||||
) -> Result<T, ControlPlaneError> {
|
||||
let status = response.status();
|
||||
if status.is_success() {
|
||||
// We shouldn't log raw body because it may contain secrets.
|
||||
@@ -456,7 +464,7 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
// as the fact that the request itself has failed.
|
||||
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
|
||||
warn!("failed to parse error body: {e}");
|
||||
ControlPlaneError {
|
||||
ControlPlaneErrorMessage {
|
||||
error: "reason unclear (malformed error message)".into(),
|
||||
http_status_code: status,
|
||||
status: None,
|
||||
@@ -465,7 +473,7 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
body.http_status_code = status;
|
||||
|
||||
warn!("console responded with an error ({status}): {body:?}");
|
||||
Err(ApiError::ControlPlane(Box::new(body)))
|
||||
Err(ControlPlaneError::Message(Box::new(body)))
|
||||
}
|
||||
|
||||
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
|
||||
216
proxy/src/control_plane/errors.rs
Normal file
216
proxy/src/control_plane/errors.rs
Normal file
@@ -0,0 +1,216 @@
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::control_plane::client::ApiLockError;
|
||||
use crate::control_plane::messages::{self, ControlPlaneErrorMessage, Reason};
|
||||
use crate::error::{io_error, ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::proxy::retry::CouldRetry;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
pub(crate) const REQUEST_FAILED: &str = "Console request failed";
|
||||
|
||||
/// Common console API error.
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum ControlPlaneError {
|
||||
/// Error returned by the console itself.
|
||||
#[error("{REQUEST_FAILED} with {0}")]
|
||||
Message(Box<ControlPlaneErrorMessage>),
|
||||
|
||||
/// Various IO errors like broken pipe or malformed payload.
|
||||
#[error("{REQUEST_FAILED}: {0}")]
|
||||
Transport(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
impl ControlPlaneError {
|
||||
/// Returns HTTP status code if it's the reason for failure.
|
||||
pub(crate) fn get_reason(&self) -> messages::Reason {
|
||||
match self {
|
||||
ControlPlaneError::Message(e) => e.get_reason(),
|
||||
ControlPlaneError::Transport(_) => messages::Reason::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for ControlPlaneError {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
// To minimize risks, only select errors are forwarded to users.
|
||||
ControlPlaneError::Message(c) => c.get_user_facing_message(),
|
||||
ControlPlaneError::Transport(_) => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for ControlPlaneError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ControlPlaneError::Message(e) => match e.get_reason() {
|
||||
Reason::RoleProtected => ErrorKind::User,
|
||||
Reason::ResourceNotFound => ErrorKind::User,
|
||||
Reason::ProjectNotFound => ErrorKind::User,
|
||||
Reason::EndpointNotFound => ErrorKind::User,
|
||||
Reason::BranchNotFound => ErrorKind::User,
|
||||
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
|
||||
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::Quota,
|
||||
Reason::ActiveTimeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::ComputeTimeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::WrittenDataQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::DataTransferQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::LogicalSizeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
|
||||
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
|
||||
Reason::RunningOperations => ErrorKind::ControlPlane,
|
||||
Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane,
|
||||
Reason::Unknown => ErrorKind::ControlPlane,
|
||||
},
|
||||
ControlPlaneError::Transport(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CouldRetry for ControlPlaneError {
|
||||
fn could_retry(&self) -> bool {
|
||||
match self {
|
||||
// retry some transport errors
|
||||
Self::Transport(io) => io.could_retry(),
|
||||
Self::Message(e) => e.could_retry(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<reqwest::Error> for ControlPlaneError {
|
||||
fn from(e: reqwest::Error) -> Self {
|
||||
io_error(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<reqwest_middleware::Error> for ControlPlaneError {
|
||||
fn from(e: reqwest_middleware::Error) -> Self {
|
||||
io_error(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum GetAuthInfoError {
|
||||
// We shouldn't include the actual secret here.
|
||||
#[error("Console responded with a malformed auth secret")]
|
||||
BadSecret,
|
||||
|
||||
#[error(transparent)]
|
||||
ApiError(ControlPlaneError),
|
||||
}
|
||||
|
||||
// This allows more useful interactions than `#[from]`.
|
||||
impl<E: Into<ControlPlaneError>> From<E> for GetAuthInfoError {
|
||||
fn from(e: E) -> Self {
|
||||
Self::ApiError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for GetAuthInfoError {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
// We absolutely should not leak any secrets!
|
||||
Self::BadSecret => REQUEST_FAILED.to_owned(),
|
||||
// However, API might return a meaningful error.
|
||||
Self::ApiError(e) => e.to_string_client(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for GetAuthInfoError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
Self::BadSecret => crate::error::ErrorKind::ControlPlane,
|
||||
Self::ApiError(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum WakeComputeError {
|
||||
#[error("Console responded with a malformed compute address: {0}")]
|
||||
BadComputeAddress(Box<str>),
|
||||
|
||||
#[error(transparent)]
|
||||
ControlPlane(ControlPlaneError),
|
||||
|
||||
#[error("Too many connections attempts")]
|
||||
TooManyConnections,
|
||||
|
||||
#[error("error acquiring resource permit: {0}")]
|
||||
TooManyConnectionAttempts(#[from] ApiLockError),
|
||||
}
|
||||
|
||||
// This allows more useful interactions than `#[from]`.
|
||||
impl<E: Into<ControlPlaneError>> From<E> for WakeComputeError {
|
||||
fn from(e: E) -> Self {
|
||||
Self::ControlPlane(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for WakeComputeError {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
// We shouldn't show user the address even if it's broken.
|
||||
// Besides, user is unlikely to care about this detail.
|
||||
Self::BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
|
||||
// However, control plane might return a meaningful error.
|
||||
Self::ControlPlane(e) => e.to_string_client(),
|
||||
|
||||
Self::TooManyConnections => self.to_string(),
|
||||
|
||||
Self::TooManyConnectionAttempts(_) => {
|
||||
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for WakeComputeError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
Self::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
|
||||
Self::ControlPlane(e) => e.get_error_kind(),
|
||||
Self::TooManyConnections => crate::error::ErrorKind::RateLimit,
|
||||
Self::TooManyConnectionAttempts(e) => e.get_error_kind(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CouldRetry for WakeComputeError {
|
||||
fn could_retry(&self) -> bool {
|
||||
match self {
|
||||
Self::BadComputeAddress(_) => false,
|
||||
Self::ControlPlane(e) => e.could_retry(),
|
||||
Self::TooManyConnections => false,
|
||||
Self::TooManyConnectionAttempts(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum GetEndpointJwksError {
|
||||
#[error("endpoint not found")]
|
||||
EndpointNotFound,
|
||||
|
||||
#[error("failed to build control plane request: {0}")]
|
||||
RequestBuild(#[source] reqwest::Error),
|
||||
|
||||
#[error("failed to send control plane request: {0}")]
|
||||
RequestExecute(#[source] reqwest_middleware::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
ControlPlane(#[from] ControlPlaneError),
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[error(transparent)]
|
||||
TokioPostgres(#[from] tokio_postgres::Error),
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[error(transparent)]
|
||||
ParseUrl(#[from] url::ParseError),
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[error(transparent)]
|
||||
TaskJoin(#[from] tokio::task::JoinError),
|
||||
}
|
||||
@@ -10,14 +10,14 @@ use crate::proxy::retry::CouldRetry;
|
||||
/// Generic error response with human-readable description.
|
||||
/// Note that we can't always present it to user as is.
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub(crate) struct ControlPlaneError {
|
||||
pub(crate) struct ControlPlaneErrorMessage {
|
||||
pub(crate) error: Box<str>,
|
||||
#[serde(skip)]
|
||||
pub(crate) http_status_code: http::StatusCode,
|
||||
pub(crate) status: Option<Status>,
|
||||
}
|
||||
|
||||
impl ControlPlaneError {
|
||||
impl ControlPlaneErrorMessage {
|
||||
pub(crate) fn get_reason(&self) -> Reason {
|
||||
self.status
|
||||
.as_ref()
|
||||
@@ -26,7 +26,7 @@ impl ControlPlaneError {
|
||||
}
|
||||
|
||||
pub(crate) fn get_user_facing_message(&self) -> String {
|
||||
use super::provider::errors::REQUEST_FAILED;
|
||||
use super::errors::REQUEST_FAILED;
|
||||
self.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.user_facing_message.as_ref())
|
||||
@@ -51,7 +51,7 @@ impl ControlPlaneError {
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ControlPlaneError {
|
||||
impl Display for ControlPlaneErrorMessage {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let msg: &str = self
|
||||
.status
|
||||
@@ -62,7 +62,7 @@ impl Display for ControlPlaneError {
|
||||
}
|
||||
}
|
||||
|
||||
impl CouldRetry for ControlPlaneError {
|
||||
impl CouldRetry for ControlPlaneErrorMessage {
|
||||
fn could_retry(&self) -> bool {
|
||||
// If the error message does not have a status,
|
||||
// the error is unknown and probably should not retry automatically
|
||||
|
||||
@@ -5,18 +5,137 @@
|
||||
pub mod messages;
|
||||
|
||||
/// Wrappers for console APIs and their mocks.
|
||||
pub mod provider;
|
||||
pub(crate) use provider::{errors, Api, AuthSecret, CachedNodeInfo, NodeInfo};
|
||||
pub mod client;
|
||||
|
||||
pub(crate) mod errors;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::auth::backend::jwt::AuthRule;
|
||||
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
|
||||
use crate::auth::IpPattern;
|
||||
use crate::cache::project_info::ProjectInfoCacheImpl;
|
||||
use crate::cache::{Cached, TimedLru};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
|
||||
use crate::intern::ProjectIdInt;
|
||||
use crate::types::{EndpointCacheKey, EndpointId};
|
||||
use crate::{compute, scram};
|
||||
|
||||
/// Various cache-related types.
|
||||
pub mod caches {
|
||||
pub use super::provider::ApiCaches;
|
||||
pub use super::client::ApiCaches;
|
||||
}
|
||||
|
||||
/// Various cache-related types.
|
||||
pub mod locks {
|
||||
pub use super::provider::ApiLocks;
|
||||
pub use super::client::ApiLocks;
|
||||
}
|
||||
|
||||
/// Console's management API.
|
||||
pub mod mgmt;
|
||||
|
||||
/// Auth secret which is managed by the cloud.
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub(crate) enum AuthSecret {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
/// Md5 hash of user's password.
|
||||
Md5([u8; 16]),
|
||||
|
||||
/// [SCRAM](crate::scram) authentication info.
|
||||
Scram(scram::ServerSecret),
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct AuthInfo {
|
||||
pub(crate) secret: Option<AuthSecret>,
|
||||
/// List of IP addresses allowed for the autorization.
|
||||
pub(crate) allowed_ips: Vec<IpPattern>,
|
||||
/// Project ID. This is used for cache invalidation.
|
||||
pub(crate) project_id: Option<ProjectIdInt>,
|
||||
}
|
||||
|
||||
/// Info for establishing a connection to a compute node.
|
||||
/// This is what we get after auth succeeded, but not before!
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct NodeInfo {
|
||||
/// Compute node connection params.
|
||||
/// It's sad that we have to clone this, but this will improve
|
||||
/// once we migrate to a bespoke connection logic.
|
||||
pub(crate) config: compute::ConnCfg,
|
||||
|
||||
/// Labels for proxy's metrics.
|
||||
pub(crate) aux: MetricsAuxInfo,
|
||||
|
||||
/// Whether we should accept self-signed certificates (for testing)
|
||||
pub(crate) allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
impl NodeInfo {
|
||||
pub(crate) async fn connect(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
timeout: Duration,
|
||||
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
|
||||
self.config
|
||||
.connect(
|
||||
ctx,
|
||||
self.allow_self_signed_compute,
|
||||
self.aux.clone(),
|
||||
timeout,
|
||||
)
|
||||
.await
|
||||
}
|
||||
pub(crate) fn reuse_settings(&mut self, other: Self) {
|
||||
self.allow_self_signed_compute = other.allow_self_signed_compute;
|
||||
self.config.reuse_password(other.config);
|
||||
}
|
||||
|
||||
pub(crate) fn set_keys(&mut self, keys: &ComputeCredentialKeys) {
|
||||
match keys {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ComputeCredentialKeys::Password(password) => self.config.password(password),
|
||||
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
|
||||
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => &mut self.config,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type NodeInfoCache =
|
||||
TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ControlPlaneErrorMessage>>>;
|
||||
pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
|
||||
pub(crate) type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option<AuthSecret>>;
|
||||
pub(crate) type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;
|
||||
|
||||
/// This will allocate per each call, but the http requests alone
|
||||
/// already require a few allocations, so it should be fine.
|
||||
pub(crate) trait ControlPlaneApi {
|
||||
/// Get the client's auth secret for authentication.
|
||||
/// Returns option because user not found situation is special.
|
||||
/// We still have to mock the scram to avoid leaking information that user doesn't exist.
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError>;
|
||||
|
||||
async fn get_endpoint_jwks(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
endpoint: EndpointId,
|
||||
) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError>;
|
||||
|
||||
/// Wake up the compute node and return the corresponding connection info.
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
|
||||
}
|
||||
|
||||
@@ -1,588 +0,0 @@
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod mock;
|
||||
pub mod neon;
|
||||
|
||||
use std::hash::Hash;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
use super::messages::{ControlPlaneError, MetricsAuxInfo};
|
||||
use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError};
|
||||
use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo};
|
||||
use crate::auth::IpPattern;
|
||||
use crate::cache::endpoints::EndpointsCache;
|
||||
use crate::cache::project_info::ProjectInfoCacheImpl;
|
||||
use crate::cache::{Cached, TimedLru};
|
||||
use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::error::ReportableError;
|
||||
use crate::intern::ProjectIdInt;
|
||||
use crate::metrics::ApiLockMetrics;
|
||||
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
|
||||
use crate::types::{EndpointCacheKey, EndpointId};
|
||||
use crate::{compute, scram};
|
||||
|
||||
pub(crate) mod errors {
|
||||
use thiserror::Error;
|
||||
|
||||
use super::ApiLockError;
|
||||
use crate::control_plane::messages::{self, ControlPlaneError, Reason};
|
||||
use crate::error::{io_error, ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::proxy::retry::CouldRetry;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
pub(crate) const REQUEST_FAILED: &str = "Console request failed";
|
||||
|
||||
/// Common console API error.
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum ApiError {
|
||||
/// Error returned by the console itself.
|
||||
#[error("{REQUEST_FAILED} with {0}")]
|
||||
ControlPlane(Box<ControlPlaneError>),
|
||||
|
||||
/// Various IO errors like broken pipe or malformed payload.
|
||||
#[error("{REQUEST_FAILED}: {0}")]
|
||||
Transport(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
impl ApiError {
|
||||
/// Returns HTTP status code if it's the reason for failure.
|
||||
pub(crate) fn get_reason(&self) -> messages::Reason {
|
||||
match self {
|
||||
ApiError::ControlPlane(e) => e.get_reason(),
|
||||
ApiError::Transport(_) => messages::Reason::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for ApiError {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
// To minimize risks, only select errors are forwarded to users.
|
||||
ApiError::ControlPlane(c) => c.get_user_facing_message(),
|
||||
ApiError::Transport(_) => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for ApiError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ApiError::ControlPlane(e) => match e.get_reason() {
|
||||
Reason::RoleProtected => ErrorKind::User,
|
||||
Reason::ResourceNotFound => ErrorKind::User,
|
||||
Reason::ProjectNotFound => ErrorKind::User,
|
||||
Reason::EndpointNotFound => ErrorKind::User,
|
||||
Reason::BranchNotFound => ErrorKind::User,
|
||||
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
|
||||
Reason::NonDefaultBranchComputeTimeExceeded => ErrorKind::Quota,
|
||||
Reason::ActiveTimeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::ComputeTimeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::WrittenDataQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::DataTransferQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::LogicalSizeQuotaExceeded => ErrorKind::Quota,
|
||||
Reason::ConcurrencyLimitReached => ErrorKind::ControlPlane,
|
||||
Reason::LockAlreadyTaken => ErrorKind::ControlPlane,
|
||||
Reason::RunningOperations => ErrorKind::ControlPlane,
|
||||
Reason::ActiveEndpointsLimitExceeded => ErrorKind::ControlPlane,
|
||||
Reason::Unknown => ErrorKind::ControlPlane,
|
||||
},
|
||||
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CouldRetry for ApiError {
|
||||
fn could_retry(&self) -> bool {
|
||||
match self {
|
||||
// retry some transport errors
|
||||
Self::Transport(io) => io.could_retry(),
|
||||
Self::ControlPlane(e) => e.could_retry(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<reqwest::Error> for ApiError {
|
||||
fn from(e: reqwest::Error) -> Self {
|
||||
io_error(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<reqwest_middleware::Error> for ApiError {
|
||||
fn from(e: reqwest_middleware::Error) -> Self {
|
||||
io_error(e).into()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum GetAuthInfoError {
|
||||
// We shouldn't include the actual secret here.
|
||||
#[error("Console responded with a malformed auth secret")]
|
||||
BadSecret,
|
||||
|
||||
#[error(transparent)]
|
||||
ApiError(ApiError),
|
||||
}
|
||||
|
||||
// This allows more useful interactions than `#[from]`.
|
||||
impl<E: Into<ApiError>> From<E> for GetAuthInfoError {
|
||||
fn from(e: E) -> Self {
|
||||
Self::ApiError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for GetAuthInfoError {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
// We absolutely should not leak any secrets!
|
||||
Self::BadSecret => REQUEST_FAILED.to_owned(),
|
||||
// However, API might return a meaningful error.
|
||||
Self::ApiError(e) => e.to_string_client(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for GetAuthInfoError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
Self::BadSecret => crate::error::ErrorKind::ControlPlane,
|
||||
Self::ApiError(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum WakeComputeError {
|
||||
#[error("Console responded with a malformed compute address: {0}")]
|
||||
BadComputeAddress(Box<str>),
|
||||
|
||||
#[error(transparent)]
|
||||
ApiError(ApiError),
|
||||
|
||||
#[error("Too many connections attempts")]
|
||||
TooManyConnections,
|
||||
|
||||
#[error("error acquiring resource permit: {0}")]
|
||||
TooManyConnectionAttempts(#[from] ApiLockError),
|
||||
}
|
||||
|
||||
// This allows more useful interactions than `#[from]`.
|
||||
impl<E: Into<ApiError>> From<E> for WakeComputeError {
|
||||
fn from(e: E) -> Self {
|
||||
Self::ApiError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for WakeComputeError {
|
||||
fn to_string_client(&self) -> String {
|
||||
match self {
|
||||
// We shouldn't show user the address even if it's broken.
|
||||
// Besides, user is unlikely to care about this detail.
|
||||
Self::BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
|
||||
// However, API might return a meaningful error.
|
||||
Self::ApiError(e) => e.to_string_client(),
|
||||
|
||||
Self::TooManyConnections => self.to_string(),
|
||||
|
||||
Self::TooManyConnectionAttempts(_) => {
|
||||
"Failed to acquire permit to connect to the database. Too many database connection attempts are currently ongoing.".to_owned()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for WakeComputeError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
Self::BadComputeAddress(_) => crate::error::ErrorKind::ControlPlane,
|
||||
Self::ApiError(e) => e.get_error_kind(),
|
||||
Self::TooManyConnections => crate::error::ErrorKind::RateLimit,
|
||||
Self::TooManyConnectionAttempts(e) => e.get_error_kind(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CouldRetry for WakeComputeError {
|
||||
fn could_retry(&self) -> bool {
|
||||
match self {
|
||||
Self::BadComputeAddress(_) => false,
|
||||
Self::ApiError(e) => e.could_retry(),
|
||||
Self::TooManyConnections => false,
|
||||
Self::TooManyConnectionAttempts(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum GetEndpointJwksError {
|
||||
#[error("endpoint not found")]
|
||||
EndpointNotFound,
|
||||
|
||||
#[error("failed to build control plane request: {0}")]
|
||||
RequestBuild(#[source] reqwest::Error),
|
||||
|
||||
#[error("failed to send control plane request: {0}")]
|
||||
RequestExecute(#[source] reqwest_middleware::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
ControlPlane(#[from] ApiError),
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[error(transparent)]
|
||||
TokioPostgres(#[from] tokio_postgres::Error),
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[error(transparent)]
|
||||
ParseUrl(#[from] url::ParseError),
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
#[error(transparent)]
|
||||
TaskJoin(#[from] tokio::task::JoinError),
|
||||
}
|
||||
}
|
||||
|
||||
/// Auth secret which is managed by the cloud.
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub(crate) enum AuthSecret {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
/// Md5 hash of user's password.
|
||||
Md5([u8; 16]),
|
||||
|
||||
/// [SCRAM](crate::scram) authentication info.
|
||||
Scram(scram::ServerSecret),
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct AuthInfo {
|
||||
pub(crate) secret: Option<AuthSecret>,
|
||||
/// List of IP addresses allowed for the autorization.
|
||||
pub(crate) allowed_ips: Vec<IpPattern>,
|
||||
/// Project ID. This is used for cache invalidation.
|
||||
pub(crate) project_id: Option<ProjectIdInt>,
|
||||
}
|
||||
|
||||
/// Info for establishing a connection to a compute node.
|
||||
/// This is what we get after auth succeeded, but not before!
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct NodeInfo {
|
||||
/// Compute node connection params.
|
||||
/// It's sad that we have to clone this, but this will improve
|
||||
/// once we migrate to a bespoke connection logic.
|
||||
pub(crate) config: compute::ConnCfg,
|
||||
|
||||
/// Labels for proxy's metrics.
|
||||
pub(crate) aux: MetricsAuxInfo,
|
||||
|
||||
/// Whether we should accept self-signed certificates (for testing)
|
||||
pub(crate) allow_self_signed_compute: bool,
|
||||
}
|
||||
|
||||
impl NodeInfo {
|
||||
pub(crate) async fn connect(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
timeout: Duration,
|
||||
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
|
||||
self.config
|
||||
.connect(
|
||||
ctx,
|
||||
self.allow_self_signed_compute,
|
||||
self.aux.clone(),
|
||||
timeout,
|
||||
)
|
||||
.await
|
||||
}
|
||||
pub(crate) fn reuse_settings(&mut self, other: Self) {
|
||||
self.allow_self_signed_compute = other.allow_self_signed_compute;
|
||||
self.config.reuse_password(other.config);
|
||||
}
|
||||
|
||||
pub(crate) fn set_keys(&mut self, keys: &ComputeCredentialKeys) {
|
||||
match keys {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ComputeCredentialKeys::Password(password) => self.config.password(password),
|
||||
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
|
||||
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => &mut self.config,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type NodeInfoCache =
|
||||
TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ControlPlaneError>>>;
|
||||
pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
|
||||
pub(crate) type CachedRoleSecret = Cached<&'static ProjectInfoCacheImpl, Option<AuthSecret>>;
|
||||
pub(crate) type CachedAllowedIps = Cached<&'static ProjectInfoCacheImpl, Arc<Vec<IpPattern>>>;
|
||||
|
||||
/// This will allocate per each call, but the http requests alone
|
||||
/// already require a few allocations, so it should be fine.
|
||||
pub(crate) trait Api {
|
||||
/// Get the client's auth secret for authentication.
|
||||
/// Returns option because user not found situation is special.
|
||||
/// We still have to mock the scram to avoid leaking information that user doesn't exist.
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError>;
|
||||
|
||||
async fn get_endpoint_jwks(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
endpoint: EndpointId,
|
||||
) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError>;
|
||||
|
||||
/// Wake up the compute node and return the corresponding connection info.
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
|
||||
}
|
||||
|
||||
#[non_exhaustive]
|
||||
#[derive(Clone)]
|
||||
pub enum ControlPlaneBackend {
|
||||
/// Current Management API (V2).
|
||||
Management(neon::Api),
|
||||
/// Local mock control plane.
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
PostgresMock(mock::Api),
|
||||
/// Internal testing
|
||||
#[cfg(test)]
|
||||
#[allow(private_interfaces)]
|
||||
Test(Box<dyn crate::auth::backend::TestBackend>),
|
||||
}
|
||||
|
||||
impl Api for ControlPlaneBackend {
|
||||
async fn get_role_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
|
||||
match self {
|
||||
Self::Management(api) => api.get_role_secret(ctx, user_info).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Self::PostgresMock(api) => api.get_role_secret(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(_) => {
|
||||
unreachable!("this function should never be called in the test backend")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
|
||||
match self {
|
||||
Self::Management(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Self::PostgresMock(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(api) => api.get_allowed_ips_and_secret(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_endpoint_jwks(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
endpoint: EndpointId,
|
||||
) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError> {
|
||||
match self {
|
||||
Self::Management(api) => api.get_endpoint_jwks(ctx, endpoint).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Self::PostgresMock(api) => api.get_endpoint_jwks(ctx, endpoint).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(_api) => Ok(vec![]),
|
||||
}
|
||||
}
|
||||
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, errors::WakeComputeError> {
|
||||
match self {
|
||||
Self::Management(api) => api.wake_compute(ctx, user_info).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Self::PostgresMock(api) => api.wake_compute(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Self::Test(api) => api.wake_compute(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Various caches for [`control_plane`](super).
|
||||
pub struct ApiCaches {
|
||||
/// Cache for the `wake_compute` API method.
|
||||
pub(crate) node_info: NodeInfoCache,
|
||||
/// Cache which stores project_id -> endpoint_ids mapping.
|
||||
pub project_info: Arc<ProjectInfoCacheImpl>,
|
||||
/// List of all valid endpoints.
|
||||
pub endpoints_cache: Arc<EndpointsCache>,
|
||||
}
|
||||
|
||||
impl ApiCaches {
|
||||
pub fn new(
|
||||
wake_compute_cache_config: CacheOptions,
|
||||
project_info_cache_config: ProjectInfoCacheOptions,
|
||||
endpoint_cache_config: EndpointCacheConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
node_info: NodeInfoCache::new(
|
||||
"node_info_cache",
|
||||
wake_compute_cache_config.size,
|
||||
wake_compute_cache_config.ttl,
|
||||
true,
|
||||
),
|
||||
project_info: Arc::new(ProjectInfoCacheImpl::new(project_info_cache_config)),
|
||||
endpoints_cache: Arc::new(EndpointsCache::new(endpoint_cache_config)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Various caches for [`control_plane`](super).
|
||||
pub struct ApiLocks<K> {
|
||||
name: &'static str,
|
||||
node_locks: DashMap<K, Arc<DynamicLimiter>>,
|
||||
config: RateLimiterConfig,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum ApiLockError {
|
||||
#[error("timeout acquiring resource permit")]
|
||||
TimeoutError(#[from] tokio::time::error::Elapsed),
|
||||
}
|
||||
|
||||
impl ReportableError for ApiLockError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
config: RateLimiterConfig,
|
||||
shards: usize,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
) -> prometheus::Result<Self> {
|
||||
Ok(Self {
|
||||
name,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
config,
|
||||
timeout,
|
||||
epoch,
|
||||
metrics,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, ApiLockError> {
|
||||
if self.config.initial_limit == 0 {
|
||||
return Ok(WakeComputePermit {
|
||||
permit: Token::disabled(),
|
||||
});
|
||||
}
|
||||
let now = Instant::now();
|
||||
let semaphore = {
|
||||
// get fast path
|
||||
if let Some(semaphore) = self.node_locks.get(key) {
|
||||
semaphore.clone()
|
||||
} else {
|
||||
self.node_locks
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| {
|
||||
self.metrics.semaphores_registered.inc();
|
||||
DynamicLimiter::new(self.config)
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
};
|
||||
let permit = semaphore.acquire_timeout(self.timeout).await;
|
||||
|
||||
self.metrics
|
||||
.semaphore_acquire_seconds
|
||||
.observe(now.elapsed().as_secs_f64());
|
||||
info!("acquired permit {:?}", now.elapsed().as_secs_f64());
|
||||
Ok(WakeComputePermit { permit: permit? })
|
||||
}
|
||||
|
||||
pub async fn garbage_collect_worker(&self) {
|
||||
if self.config.initial_limit == 0 {
|
||||
return;
|
||||
}
|
||||
let mut interval =
|
||||
tokio::time::interval(self.epoch / (self.node_locks.shards().len()) as u32);
|
||||
loop {
|
||||
for (i, shard) in self.node_locks.shards().iter().enumerate() {
|
||||
interval.tick().await;
|
||||
// temporary lock a single shard and then clear any semaphores that aren't currently checked out
|
||||
// race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked
|
||||
// therefore releasing it is safe from race conditions
|
||||
info!(
|
||||
name = self.name,
|
||||
shard = i,
|
||||
"performing epoch reclamation on api lock"
|
||||
);
|
||||
let mut lock = shard.write();
|
||||
let timer = self.metrics.reclamation_lag_seconds.start_timer();
|
||||
let count = lock
|
||||
.extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1)
|
||||
.count();
|
||||
drop(lock);
|
||||
self.metrics.semaphores_unregistered.inc_by(count as u64);
|
||||
timer.observe();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct WakeComputePermit {
|
||||
permit: Token,
|
||||
}
|
||||
|
||||
impl WakeComputePermit {
|
||||
pub(crate) fn should_check_cache(&self) -> bool {
|
||||
!self.permit.is_disabled()
|
||||
}
|
||||
pub(crate) fn release(self, outcome: Outcome) {
|
||||
self.permit.release(outcome);
|
||||
}
|
||||
pub(crate) fn release_result<T, E>(self, res: Result<T, E>) -> Result<T, E> {
|
||||
match res {
|
||||
Ok(_) => self.release(Outcome::Success),
|
||||
Err(_) => self.release(Outcome::Overload),
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl FetchAuthRules for ControlPlaneBackend {
|
||||
async fn fetch_auth_rules(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
endpoint: EndpointId,
|
||||
) -> Result<Vec<AuthRule>, FetchAuthRulesError> {
|
||||
self.get_endpoint_jwks(ctx, endpoint)
|
||||
.await
|
||||
.map_err(FetchAuthRulesError::GetEndpointJwks)
|
||||
}
|
||||
}
|
||||
@@ -20,14 +20,14 @@ use super::connect_compute::ConnectMechanism;
|
||||
use super::retry::CouldRetry;
|
||||
use super::*;
|
||||
use crate::auth::backend::{
|
||||
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned, TestBackend,
|
||||
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned,
|
||||
};
|
||||
use crate::config::{CertResolver, RetryConfig};
|
||||
use crate::control_plane::messages::{ControlPlaneError, Details, MetricsAuxInfo, Status};
|
||||
use crate::control_plane::provider::{
|
||||
CachedAllowedIps, CachedRoleSecret, ControlPlaneBackend, NodeInfoCache,
|
||||
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
|
||||
use crate::control_plane::{
|
||||
self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo, NodeInfoCache,
|
||||
};
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::ErrorKind;
|
||||
use crate::types::{BranchId, EndpointId, ProjectId};
|
||||
use crate::{sasl, scram};
|
||||
@@ -490,7 +490,7 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {}
|
||||
}
|
||||
|
||||
impl TestBackend for TestConnectMechanism {
|
||||
impl TestControlPlaneClient for TestConnectMechanism {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
|
||||
let mut counter = self.counter.lock().unwrap();
|
||||
let action = self.sequence[*counter];
|
||||
@@ -498,18 +498,19 @@ impl TestBackend for TestConnectMechanism {
|
||||
match action {
|
||||
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
|
||||
ConnectAction::WakeFail => {
|
||||
let err =
|
||||
control_plane::errors::ApiError::ControlPlane(Box::new(ControlPlaneError {
|
||||
let err = control_plane::errors::ControlPlaneError::Message(Box::new(
|
||||
ControlPlaneErrorMessage {
|
||||
http_status_code: StatusCode::BAD_REQUEST,
|
||||
error: "TEST".into(),
|
||||
status: None,
|
||||
}));
|
||||
},
|
||||
));
|
||||
assert!(!err.could_retry());
|
||||
Err(control_plane::errors::WakeComputeError::ApiError(err))
|
||||
Err(control_plane::errors::WakeComputeError::ControlPlane(err))
|
||||
}
|
||||
ConnectAction::WakeRetry => {
|
||||
let err =
|
||||
control_plane::errors::ApiError::ControlPlane(Box::new(ControlPlaneError {
|
||||
let err = control_plane::errors::ControlPlaneError::Message(Box::new(
|
||||
ControlPlaneErrorMessage {
|
||||
http_status_code: StatusCode::BAD_REQUEST,
|
||||
error: "TEST".into(),
|
||||
status: Some(Status {
|
||||
@@ -523,9 +524,10 @@ impl TestBackend for TestConnectMechanism {
|
||||
user_facing_message: None,
|
||||
},
|
||||
}),
|
||||
}));
|
||||
},
|
||||
));
|
||||
assert!(err.could_retry());
|
||||
Err(control_plane::errors::WakeComputeError::ApiError(err))
|
||||
Err(control_plane::errors::WakeComputeError::ControlPlane(err))
|
||||
}
|
||||
x => panic!("expecting action {x:?}, wake_compute is called instead"),
|
||||
}
|
||||
@@ -538,7 +540,7 @@ impl TestBackend for TestConnectMechanism {
|
||||
unimplemented!("not used in tests")
|
||||
}
|
||||
|
||||
fn dyn_clone(&self) -> Box<dyn TestBackend> {
|
||||
fn dyn_clone(&self) -> Box<dyn TestControlPlaneClient> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
@@ -562,7 +564,7 @@ fn helper_create_connect_info(
|
||||
mechanism: &TestConnectMechanism,
|
||||
) -> auth::Backend<'static, ComputeCredentials> {
|
||||
let user_info = auth::Backend::ControlPlane(
|
||||
MaybeOwned::Owned(ControlPlaneBackend::Test(Box::new(mechanism.clone()))),
|
||||
MaybeOwned::Owned(ControlPlaneClient::Test(Box::new(mechanism.clone()))),
|
||||
ComputeCredentials {
|
||||
info: ComputeUserInfo {
|
||||
endpoint: "endpoint".into(),
|
||||
|
||||
@@ -4,7 +4,7 @@ use super::connect_compute::ComputeConnectBackend;
|
||||
use crate::config::RetryConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::provider::CachedNodeInfo;
|
||||
use crate::control_plane::CachedNodeInfo;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{
|
||||
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
|
||||
|
||||
@@ -24,9 +24,9 @@ use crate::compute_ctl::{
|
||||
};
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::client::ApiLockError;
|
||||
use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
use crate::control_plane::provider::ApiLockError;
|
||||
use crate::control_plane::CachedNodeInfo;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::intern::EndpointIdInt;
|
||||
|
||||
Reference in New Issue
Block a user