From e65d5f73697a55bac9966c9cb91124ba5cadaaaa Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Mon, 7 Jul 2025 19:46:33 +0200 Subject: [PATCH] proxy: Remove the endpoint filter cache (#12488) ## Problem The endpoint filter cache is still unused because it's not yet reliable enough to be used. It only consumes a lot of memory. ## Summary of changes Remove the code. Needs a new design. neondatabase/cloud#30634 --- proxy/src/binary/proxy.rs | 23 +- proxy/src/cache/endpoints.rs | 283 ------------------ proxy/src/cache/mod.rs | 1 - proxy/src/config.rs | 75 +---- proxy/src/context/mod.rs | 48 +-- .../control_plane/client/cplane_proxy_v1.rs | 17 -- proxy/src/control_plane/client/mod.rs | 7 +- proxy/src/control_plane/errors.rs | 11 - proxy/src/rate_limiter/limiter.rs | 38 --- proxy/src/rate_limiter/mod.rs | 2 +- proxy/src/types.rs | 10 - 11 files changed, 11 insertions(+), 504 deletions(-) delete mode 100644 proxy/src/cache/endpoints.rs diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index c10678dc68..691709ce2a 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -21,7 +21,7 @@ use tokio::net::TcpListener; use tokio::sync::Notify; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, error, info, warn}; +use tracing::{error, info, warn}; use utils::sentry_init::init_sentry; use utils::{project_build_tag, project_git_version}; @@ -195,7 +195,9 @@ struct ProxyCliArgs { #[clap(long, default_value = config::ProjectInfoCacheOptions::CACHE_DEFAULT_OPTIONS)] project_info_cache: String, /// cache for all valid endpoints - #[clap(long, default_value = config::EndpointCacheConfig::CACHE_DEFAULT_OPTIONS)] + // TODO: remove after a couple of releases. + #[clap(long, default_value_t = String::new())] + #[deprecated] endpoint_cache_config: String, #[clap(flatten)] parquet_upload: ParquetUploadArgs, @@ -558,13 +560,6 @@ pub async fn run() -> anyhow::Result<()> { } } } - - // listen for notifications of new projects/endpoints/branches - let cache = api.caches.endpoints_cache.clone(); - let span = tracing::info_span!("endpoints_cache"); - maintenance_tasks.spawn( - async move { cache.do_read(client, cancellation_token.clone()).await }.instrument(span), - ); } let maintenance = loop { @@ -712,18 +707,15 @@ fn build_auth_backend( let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?; let project_info_cache_config: ProjectInfoCacheOptions = args.project_info_cache.parse()?; - let endpoint_cache_config: config::EndpointCacheConfig = - args.endpoint_cache_config.parse()?; info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}"); info!( "Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}" ); - info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}"); + let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new( wake_compute_cache_config, project_info_cache_config, - endpoint_cache_config, ))); let config::ConcurrencyLockOptions { @@ -793,18 +785,15 @@ fn build_auth_backend( let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?; let project_info_cache_config: ProjectInfoCacheOptions = args.project_info_cache.parse()?; - let endpoint_cache_config: config::EndpointCacheConfig = - args.endpoint_cache_config.parse()?; info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}"); info!( "Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}" ); - info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}"); + let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new( wake_compute_cache_config, project_info_cache_config, - endpoint_cache_config, ))); let config::ConcurrencyLockOptions { diff --git a/proxy/src/cache/endpoints.rs b/proxy/src/cache/endpoints.rs deleted file mode 100644 index 3c88e07484..0000000000 --- a/proxy/src/cache/endpoints.rs +++ /dev/null @@ -1,283 +0,0 @@ -use std::convert::Infallible; -use std::future::pending; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; - -use clashmap::ClashSet; -use redis::streams::{StreamReadOptions, StreamReadReply}; -use redis::{AsyncCommands, FromRedisValue, Value}; -use serde::Deserialize; -use tokio_util::sync::CancellationToken; -use tracing::info; - -use crate::config::EndpointCacheConfig; -use crate::context::RequestContext; -use crate::ext::LockExt; -use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt}; -use crate::metrics::{Metrics, RedisErrors, RedisEventsCount}; -use crate::rate_limiter::GlobalRateLimiter; -use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider; -use crate::types::EndpointId; - -// TODO: this could be an enum, but events in Redis need to be fixed first. -// ProjectCreated was sent with type:branch_created. So we ignore type. -#[derive(Deserialize, Debug, Clone, PartialEq)] -struct ControlPlaneEvent { - endpoint_created: Option, - branch_created: Option, - project_created: Option, - #[serde(rename = "type")] - _type: Option, -} - -#[derive(Deserialize, Debug, Clone, PartialEq)] -struct EndpointCreated { - endpoint_id: EndpointIdInt, -} - -#[derive(Deserialize, Debug, Clone, PartialEq)] -struct BranchCreated { - branch_id: BranchIdInt, -} - -#[derive(Deserialize, Debug, Clone, PartialEq)] -struct ProjectCreated { - project_id: ProjectIdInt, -} - -impl TryFrom<&Value> for ControlPlaneEvent { - type Error = anyhow::Error; - fn try_from(value: &Value) -> Result { - let json = String::from_redis_value(value)?; - Ok(serde_json::from_str(&json)?) - } -} - -pub struct EndpointsCache { - config: EndpointCacheConfig, - endpoints: ClashSet, - branches: ClashSet, - projects: ClashSet, - ready: AtomicBool, - limiter: Arc>, -} - -impl EndpointsCache { - pub(crate) fn new(config: EndpointCacheConfig) -> Self { - Self { - limiter: Arc::new(Mutex::new(GlobalRateLimiter::new( - config.limiter_info.clone(), - ))), - config, - endpoints: ClashSet::new(), - branches: ClashSet::new(), - projects: ClashSet::new(), - ready: AtomicBool::new(false), - } - } - - pub(crate) fn is_valid(&self, ctx: &RequestContext, endpoint: &EndpointId) -> bool { - if !self.ready.load(Ordering::Acquire) { - // the endpoint cache is not yet fully initialised. - return true; - } - - if !self.should_reject(endpoint) { - ctx.set_rejected(false); - return true; - } - - // report that we might want to reject this endpoint - ctx.set_rejected(true); - - // If cache is disabled, just collect the metrics and return. - if self.config.disable_cache { - return true; - } - - // If the limiter allows, we can pretend like it's valid - // (incase it is, due to redis channel lag). - if self.limiter.lock_propagate_poison().check() { - return true; - } - - // endpoint not found, and there's too much load. - false - } - - fn should_reject(&self, endpoint: &EndpointId) -> bool { - if endpoint.is_endpoint() { - let Some(endpoint) = EndpointIdInt::get(endpoint) else { - // if we haven't interned this endpoint, it's not in the cache. - return true; - }; - !self.endpoints.contains(&endpoint) - } else if endpoint.is_branch() { - let Some(branch) = BranchIdInt::get(endpoint) else { - // if we haven't interned this branch, it's not in the cache. - return true; - }; - !self.branches.contains(&branch) - } else { - let Some(project) = ProjectIdInt::get(endpoint) else { - // if we haven't interned this project, it's not in the cache. - return true; - }; - !self.projects.contains(&project) - } - } - - fn insert_event(&self, event: ControlPlaneEvent) { - if let Some(endpoint_created) = event.endpoint_created { - self.endpoints.insert(endpoint_created.endpoint_id); - Metrics::get() - .proxy - .redis_events_count - .inc(RedisEventsCount::EndpointCreated); - } else if let Some(branch_created) = event.branch_created { - self.branches.insert(branch_created.branch_id); - Metrics::get() - .proxy - .redis_events_count - .inc(RedisEventsCount::BranchCreated); - } else if let Some(project_created) = event.project_created { - self.projects.insert(project_created.project_id); - Metrics::get() - .proxy - .redis_events_count - .inc(RedisEventsCount::ProjectCreated); - } - } - - pub async fn do_read( - &self, - mut con: ConnectionWithCredentialsProvider, - cancellation_token: CancellationToken, - ) -> anyhow::Result { - let mut last_id = "0-0".to_string(); - loop { - if let Err(e) = con.connect().await { - tracing::error!("error connecting to redis: {:?}", e); - self.ready.store(false, Ordering::Release); - } - if let Err(e) = self.read_from_stream(&mut con, &mut last_id).await { - tracing::error!("error reading from redis: {:?}", e); - self.ready.store(false, Ordering::Release); - } - if cancellation_token.is_cancelled() { - info!("cancellation token is cancelled, exiting"); - // Maintenance tasks run forever. Sleep forever when canceled. - pending::<()>().await; - } - tokio::time::sleep(self.config.retry_interval).await; - } - } - - async fn read_from_stream( - &self, - con: &mut ConnectionWithCredentialsProvider, - last_id: &mut String, - ) -> anyhow::Result<()> { - tracing::info!("reading endpoints/branches/projects from redis"); - self.batch_read( - con, - StreamReadOptions::default().count(self.config.initial_batch_size), - last_id, - true, - ) - .await?; - tracing::info!("ready to filter user requests"); - self.ready.store(true, Ordering::Release); - self.batch_read( - con, - StreamReadOptions::default() - .count(self.config.default_batch_size) - .block(self.config.xread_timeout.as_millis() as usize), - last_id, - false, - ) - .await - } - - async fn batch_read( - &self, - conn: &mut ConnectionWithCredentialsProvider, - opts: StreamReadOptions, - last_id: &mut String, - return_when_finish: bool, - ) -> anyhow::Result<()> { - let mut total: usize = 0; - loop { - let mut res: StreamReadReply = conn - .xread_options(&[&self.config.stream_name], &[last_id.as_str()], &opts) - .await?; - - if res.keys.is_empty() { - if return_when_finish { - if total != 0 { - break; - } - anyhow::bail!( - "Redis stream {} is empty, cannot be used to filter endpoints", - self.config.stream_name - ); - } - // If we are not returning when finish, we should wait for more data. - continue; - } - if res.keys.len() != 1 { - anyhow::bail!("Cannot read from redis stream {}", self.config.stream_name); - } - - let key = res.keys.pop().expect("Checked length above"); - let len = key.ids.len(); - for stream_id in key.ids { - total += 1; - for value in stream_id.map.values() { - match value.try_into() { - Ok(event) => self.insert_event(event), - Err(err) => { - Metrics::get().proxy.redis_errors_total.inc(RedisErrors { - channel: &self.config.stream_name, - }); - tracing::error!("error parsing value {value:?}: {err:?}"); - } - } - } - if total.is_power_of_two() { - tracing::debug!("endpoints read {}", total); - } - *last_id = stream_id.id; - } - if return_when_finish && len <= self.config.default_batch_size { - break; - } - } - tracing::info!("read {} endpoints/branches/projects from redis", total); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_control_plane_event() { - let s = r#"{"branch_created":null,"endpoint_created":{"endpoint_id":"ep-rapid-thunder-w0qqw2q9"},"project_created":null,"type":"endpoint_created"}"#; - - let endpoint_id: EndpointId = "ep-rapid-thunder-w0qqw2q9".into(); - - assert_eq!( - serde_json::from_str::(s).unwrap(), - ControlPlaneEvent { - endpoint_created: Some(EndpointCreated { - endpoint_id: endpoint_id.into(), - }), - branch_created: None, - project_created: None, - _type: Some("endpoint_created".into()), - } - ); - } -} diff --git a/proxy/src/cache/mod.rs b/proxy/src/cache/mod.rs index 6c168144a7..ce7f781213 100644 --- a/proxy/src/cache/mod.rs +++ b/proxy/src/cache/mod.rs @@ -1,5 +1,4 @@ pub(crate) mod common; -pub(crate) mod endpoints; pub(crate) mod project_info; mod timed_lru; diff --git a/proxy/src/config.rs b/proxy/src/config.rs index f97006e206..6157dc8a6a 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -18,7 +18,7 @@ use crate::control_plane::locks::ApiLocks; use crate::control_plane::messages::{EndpointJwksResponse, JwksSettings}; use crate::ext::TaskExt; use crate::intern::RoleNameInt; -use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}; +use crate::rate_limiter::{RateLimitAlgorithm, RateLimiterConfig}; use crate::scram::threadpool::ThreadPool; use crate::serverless::GlobalConnPoolOptions; use crate::serverless::cancel_set::CancelSet; @@ -80,79 +80,6 @@ pub struct AuthenticationConfig { pub console_redirect_confirmation_timeout: tokio::time::Duration, } -#[derive(Debug)] -pub struct EndpointCacheConfig { - /// Batch size to receive all endpoints on the startup. - pub initial_batch_size: usize, - /// Batch size to receive endpoints. - pub default_batch_size: usize, - /// Timeouts for the stream read operation. - pub xread_timeout: Duration, - /// Stream name to read from. - pub stream_name: String, - /// Limiter info (to distinguish when to enable cache). - pub limiter_info: Vec, - /// Disable cache. - /// If true, cache is ignored, but reports all statistics. - pub disable_cache: bool, - /// Retry interval for the stream read operation. - pub retry_interval: Duration, -} - -impl EndpointCacheConfig { - /// Default options for [`crate::control_plane::NodeInfoCache`]. - /// Notice that by default the limiter is empty, which means that cache is disabled. - pub const CACHE_DEFAULT_OPTIONS: &'static str = "initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s"; - - /// Parse cache options passed via cmdline. - /// Example: [`Self::CACHE_DEFAULT_OPTIONS`]. - fn parse(options: &str) -> anyhow::Result { - let mut initial_batch_size = None; - let mut default_batch_size = None; - let mut xread_timeout = None; - let mut stream_name = None; - let mut limiter_info = vec![]; - let mut disable_cache = false; - let mut retry_interval = None; - - for option in options.split(',') { - let (key, value) = option - .split_once('=') - .with_context(|| format!("bad key-value pair: {option}"))?; - - match key { - "initial_batch_size" => initial_batch_size = Some(value.parse()?), - "default_batch_size" => default_batch_size = Some(value.parse()?), - "xread_timeout" => xread_timeout = Some(humantime::parse_duration(value)?), - "stream_name" => stream_name = Some(value.to_string()), - "limiter_info" => limiter_info.push(RateBucketInfo::from_str(value)?), - "disable_cache" => disable_cache = value.parse()?, - "retry_interval" => retry_interval = Some(humantime::parse_duration(value)?), - unknown => bail!("unknown key: {unknown}"), - } - } - RateBucketInfo::validate(&mut limiter_info)?; - - Ok(Self { - initial_batch_size: initial_batch_size.context("missing `initial_batch_size`")?, - default_batch_size: default_batch_size.context("missing `default_batch_size`")?, - xread_timeout: xread_timeout.context("missing `xread_timeout`")?, - stream_name: stream_name.context("missing `stream_name`")?, - disable_cache, - limiter_info, - retry_interval: retry_interval.context("missing `retry_interval`")?, - }) - } -} - -impl FromStr for EndpointCacheConfig { - type Err = anyhow::Error; - - fn from_str(options: &str) -> Result { - let error = || format!("failed to parse endpoint cache options '{options}'"); - Self::parse(options).with_context(error) - } -} #[derive(Debug)] pub struct MetricBackupCollectionConfig { pub remote_storage_config: Option, diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 7b0549e76f..3a8828e70c 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -7,7 +7,7 @@ use once_cell::sync::OnceCell; use smol_str::SmolStr; use tokio::sync::mpsc; use tracing::field::display; -use tracing::{Span, debug, error, info_span}; +use tracing::{Span, error, info_span}; use try_lock::TryLock; use uuid::Uuid; @@ -15,10 +15,7 @@ use self::parquet::RequestData; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::error::ErrorKind; use crate::intern::{BranchIdInt, ProjectIdInt}; -use crate::metrics::{ - ConnectOutcome, InvalidEndpointsGroup, LatencyAccumulated, LatencyTimer, Metrics, Protocol, - Waiting, -}; +use crate::metrics::{LatencyAccumulated, LatencyTimer, Metrics, Protocol, Waiting}; use crate::pqproto::StartupMessageParams; use crate::protocol2::{ConnectionInfo, ConnectionInfoExtra}; use crate::types::{DbName, EndpointId, RoleName}; @@ -70,8 +67,6 @@ struct RequestContextInner { // This sender is only used to log the length of session in case of success. disconnect_sender: Option>, pub(crate) latency_timer: LatencyTimer, - // Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane. - rejected: Option, disconnect_timestamp: Option>, } @@ -106,7 +101,6 @@ impl Clone for RequestContext { auth_method: inner.auth_method.clone(), jwt_issuer: inner.jwt_issuer.clone(), success: inner.success, - rejected: inner.rejected, cold_start_info: inner.cold_start_info, pg_options: inner.pg_options.clone(), testodrome_query_id: inner.testodrome_query_id.clone(), @@ -151,7 +145,6 @@ impl RequestContext { auth_method: None, jwt_issuer: None, success: false, - rejected: None, cold_start_info: ColdStartInfo::Unknown, pg_options: None, testodrome_query_id: None, @@ -183,11 +176,6 @@ impl RequestContext { ) } - pub(crate) fn set_rejected(&self, rejected: bool) { - let mut this = self.0.try_lock().expect("should not deadlock"); - this.rejected = Some(rejected); - } - pub(crate) fn set_cold_start_info(&self, info: ColdStartInfo) { self.0 .try_lock() @@ -461,38 +449,6 @@ impl RequestContextInner { } fn log_connect(&mut self) { - let outcome = if self.success { - ConnectOutcome::Success - } else { - ConnectOutcome::Failed - }; - - // TODO: get rid of entirely/refactor - // check for false positives - // AND false negatives - if let Some(rejected) = self.rejected { - let ep = self - .endpoint_id - .as_ref() - .map(|x| x.as_str()) - .unwrap_or_default(); - // This makes sense only if cache is disabled - debug!( - ?outcome, - ?rejected, - ?ep, - "check endpoint is valid with outcome" - ); - Metrics::get() - .proxy - .invalid_endpoints_total - .inc(InvalidEndpointsGroup { - protocol: self.protocol, - rejected: rejected.into(), - outcome, - }); - } - if let Some(tx) = self.sender.take() { // If type changes, this error handling needs to be updated. let tx: mpsc::UnboundedSender = tx; diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index fbacc97661..fc263b73b1 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -159,13 +159,6 @@ impl NeonControlPlaneClient { ctx: &RequestContext, endpoint: &EndpointId, ) -> Result, GetEndpointJwksError> { - if !self - .caches - .endpoints_cache - .is_valid(ctx, &endpoint.normalize()) - { - return Err(GetEndpointJwksError::EndpointNotFound); - } let request_id = ctx.session_id().to_string(); async { let request = self @@ -300,11 +293,6 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { return Ok(secret); } - if !self.caches.endpoints_cache.is_valid(ctx, normalized_ep) { - info!("endpoint is not valid, skipping the request"); - return Err(GetAuthInfoError::UnknownEndpoint); - } - let auth_info = self.do_get_auth_req(ctx, endpoint, role).await?; let control = EndpointAccessControl { @@ -346,11 +334,6 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { return Ok(control); } - if !self.caches.endpoints_cache.is_valid(ctx, normalized_ep) { - info!("endpoint is not valid, skipping the request"); - return Err(GetAuthInfoError::UnknownEndpoint); - } - let auth_info = self.do_get_auth_req(ctx, endpoint, role).await?; let control = EndpointAccessControl { diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index 2ffc589df6..ecd4db29b2 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -13,9 +13,8 @@ use tracing::{debug, info}; use super::{EndpointAccessControl, RoleAccessControl}; use crate::auth::backend::ComputeUserInfo; use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError}; -use crate::cache::endpoints::EndpointsCache; use crate::cache::project_info::ProjectInfoCacheImpl; -use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions}; +use crate::config::{CacheOptions, ProjectInfoCacheOptions}; use crate::context::RequestContext; use crate::control_plane::{CachedNodeInfo, ControlPlaneApi, NodeInfoCache, errors}; use crate::error::ReportableError; @@ -121,15 +120,12 @@ pub struct ApiCaches { pub(crate) node_info: NodeInfoCache, /// Cache which stores project_id -> endpoint_ids mapping. pub project_info: Arc, - /// List of all valid endpoints. - pub endpoints_cache: Arc, } impl ApiCaches { pub fn new( wake_compute_cache_config: CacheOptions, project_info_cache_config: ProjectInfoCacheOptions, - endpoint_cache_config: EndpointCacheConfig, ) -> Self { Self { node_info: NodeInfoCache::new( @@ -139,7 +135,6 @@ impl ApiCaches { true, ), project_info: Arc::new(ProjectInfoCacheImpl::new(project_info_cache_config)), - endpoints_cache: Arc::new(EndpointsCache::new(endpoint_cache_config)), } } } diff --git a/proxy/src/control_plane/errors.rs b/proxy/src/control_plane/errors.rs index 77312c89c5..f640657d90 100644 --- a/proxy/src/control_plane/errors.rs +++ b/proxy/src/control_plane/errors.rs @@ -99,10 +99,6 @@ pub(crate) enum GetAuthInfoError { #[error(transparent)] ApiError(ControlPlaneError), - - /// Proxy does not know about the endpoint in advanced - #[error("endpoint not found in endpoint cache")] - UnknownEndpoint, } // This allows more useful interactions than `#[from]`. @@ -119,8 +115,6 @@ impl UserFacingError for GetAuthInfoError { Self::BadSecret => REQUEST_FAILED.to_owned(), // However, API might return a meaningful error. Self::ApiError(e) => e.to_string_client(), - // pretend like control plane returned an error. - Self::UnknownEndpoint => REQUEST_FAILED.to_owned(), } } } @@ -130,8 +124,6 @@ impl ReportableError for GetAuthInfoError { match self { Self::BadSecret => crate::error::ErrorKind::ControlPlane, Self::ApiError(_) => crate::error::ErrorKind::ControlPlane, - // we only apply endpoint filtering if control plane is under high load. - Self::UnknownEndpoint => crate::error::ErrorKind::ServiceRateLimit, } } } @@ -200,9 +192,6 @@ impl CouldRetry for WakeComputeError { #[derive(Debug, Error)] pub enum GetEndpointJwksError { - #[error("endpoint not found")] - EndpointNotFound, - #[error("failed to build control plane request: {0}")] RequestBuild(#[source] reqwest::Error), diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index 61d4636c2b..fd1b2af023 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -16,44 +16,6 @@ use super::LeakyBucketConfig; use crate::ext::LockExt; use crate::intern::EndpointIdInt; -pub struct GlobalRateLimiter { - data: Vec, - info: Vec, -} - -impl GlobalRateLimiter { - pub fn new(info: Vec) -> Self { - Self { - data: vec![ - RateBucket { - start: Instant::now(), - count: 0, - }; - info.len() - ], - info, - } - } - - /// Check that number of connections is below `max_rps` rps. - pub fn check(&mut self) -> bool { - let now = Instant::now(); - - let should_allow_request = self - .data - .iter_mut() - .zip(&self.info) - .all(|(bucket, info)| bucket.should_allow_request(info, now, 1)); - - if should_allow_request { - // only increment the bucket counts if the request will actually be accepted - self.data.iter_mut().for_each(|b| b.inc(1)); - } - - should_allow_request - } -} - // Simple per-endpoint rate limiter. // // Check that number of connections to the endpoint is below `max_rps` rps. diff --git a/proxy/src/rate_limiter/mod.rs b/proxy/src/rate_limiter/mod.rs index 112b95873a..828bb63aac 100644 --- a/proxy/src/rate_limiter/mod.rs +++ b/proxy/src/rate_limiter/mod.rs @@ -8,4 +8,4 @@ pub(crate) use limit_algorithm::aimd::Aimd; pub(crate) use limit_algorithm::{ DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token, }; -pub use limiter::{GlobalRateLimiter, RateBucketInfo, WakeComputeRateLimiter}; +pub use limiter::{RateBucketInfo, WakeComputeRateLimiter}; diff --git a/proxy/src/types.rs b/proxy/src/types.rs index d5952d1d8b..43b8dc5b29 100644 --- a/proxy/src/types.rs +++ b/proxy/src/types.rs @@ -107,13 +107,3 @@ smol_str_wrapper!(DbName); // postgres hostname, will likely be a port:ip addr smol_str_wrapper!(Host); - -// Endpoints are a bit tricky. Rare they might be branches or projects. -impl EndpointId { - pub(crate) fn is_endpoint(&self) -> bool { - self.0.starts_with("ep-") - } - pub(crate) fn is_branch(&self) -> bool { - self.0.starts_with("br-") - } -}