diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index 1bad7b3086..f38ecf715f 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -74,10 +74,6 @@ impl std::fmt::Display for Backend<'_, ()> { .debug_tuple("ControlPlane::ProxyV1") .field(&endpoint.url()) .finish(), - ControlPlaneClient::Neon(endpoint) => fmt - .debug_tuple("ControlPlane::Neon") - .field(&endpoint.url()) - .finish(), #[cfg(any(test, feature = "testing"))] ControlPlaneClient::PostgresMock(endpoint) => fmt .debug_tuple("ControlPlane::PostgresMock") diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 99144acef0..97c4037009 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -43,9 +43,6 @@ static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[derive(Clone, Debug, ValueEnum)] enum AuthBackendType { - #[value(name("console"), alias("cplane"))] - ControlPlane, - #[value(name("cplane-v1"), alias("control-plane"))] ControlPlaneV1, @@ -488,40 +485,7 @@ async fn main() -> anyhow::Result<()> { } if let Either::Left(auth::Backend::ControlPlane(api, _)) = &auth_backend { - if let proxy::control_plane::client::ControlPlaneClient::Neon(api) = &**api { - match (redis_notifications_client, regional_redis_client.clone()) { - (None, None) => {} - (client1, client2) => { - let cache = api.caches.project_info.clone(); - if let Some(client) = client1 { - maintenance_tasks.spawn(notifications::task_main( - client, - cache.clone(), - cancel_map.clone(), - args.region.clone(), - )); - } - if let Some(client) = client2 { - maintenance_tasks.spawn(notifications::task_main( - client, - cache.clone(), - cancel_map.clone(), - args.region.clone(), - )); - } - maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); - } - } - if let Some(regional_redis_client) = regional_redis_client { - let cache = api.caches.endpoints_cache.clone(); - let con = regional_redis_client; - let span = tracing::info_span!("endpoints_cache"); - maintenance_tasks.spawn( - async move { cache.do_read(con, cancellation_token.clone()).await } - .instrument(span), - ); - } - } else if let proxy::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api { + if let proxy::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api { match (redis_notifications_client, regional_redis_client.clone()) { (None, None) => {} (client1, client2) => { @@ -757,65 +721,6 @@ fn build_auth_backend( Ok(Either::Left(config)) } - AuthBackendType::ControlPlane => { - let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?; - let project_info_cache_config: ProjectInfoCacheOptions = - args.project_info_cache.parse()?; - let endpoint_cache_config: config::EndpointCacheConfig = - args.endpoint_cache_config.parse()?; - - info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}"); - info!( - "Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}" - ); - info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}"); - let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new( - wake_compute_cache_config, - project_info_cache_config, - endpoint_cache_config, - ))); - - let config::ConcurrencyLockOptions { - shards, - limiter, - epoch, - timeout, - } = args.wake_compute_lock.parse()?; - info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)"); - let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new( - "wake_compute_lock", - limiter, - shards, - timeout, - epoch, - &Metrics::get().wake_compute_lock, - )?)); - tokio::spawn(locks.garbage_collect_worker()); - - let url: proxy::url::ApiUrl = args.auth_endpoint.parse()?; - - let endpoint = http::Endpoint::new(url, http::new_client()); - - let mut wake_compute_rps_limit = args.wake_compute_limit.clone(); - RateBucketInfo::validate(&mut wake_compute_rps_limit)?; - let wake_compute_endpoint_rate_limiter = - Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit)); - - let api = control_plane::client::neon::NeonControlPlaneClient::new( - endpoint, - args.control_plane_token.clone(), - caches, - locks, - wake_compute_endpoint_rate_limiter, - ); - let api = control_plane::client::ControlPlaneClient::Neon(api); - let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()); - - let config = Box::leak(Box::new(auth_backend)); - - Ok(Either::Left(config)) - } - #[cfg(feature = "testing")] AuthBackendType::Postgres => { let url = args.auth_endpoint.parse()?; diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index 7ef5a9c9fd..d559d96bbc 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -1,7 +1,6 @@ pub mod cplane_proxy_v1; #[cfg(any(test, feature = "testing"))] pub mod mock; -pub mod neon; use std::hash::Hash; use std::sync::Arc; @@ -28,10 +27,8 @@ use crate::types::EndpointId; #[non_exhaustive] #[derive(Clone)] pub enum ControlPlaneClient { - /// New Proxy V1 control plane API + /// Proxy V1 control plane API ProxyV1(cplane_proxy_v1::NeonControlPlaneClient), - /// Current Management API (V2). - Neon(neon::NeonControlPlaneClient), /// Local mock control plane. #[cfg(any(test, feature = "testing"))] PostgresMock(mock::MockControlPlane), @@ -49,7 +46,6 @@ impl ControlPlaneApi for ControlPlaneClient { ) -> Result { match self { Self::ProxyV1(api) => api.get_role_secret(ctx, user_info).await, - Self::Neon(api) => api.get_role_secret(ctx, user_info).await, #[cfg(any(test, feature = "testing"))] Self::PostgresMock(api) => api.get_role_secret(ctx, user_info).await, #[cfg(test)] @@ -66,7 +62,6 @@ impl ControlPlaneApi for ControlPlaneClient { ) -> Result<(CachedAllowedIps, Option), errors::GetAuthInfoError> { match self { Self::ProxyV1(api) => api.get_allowed_ips_and_secret(ctx, user_info).await, - Self::Neon(api) => api.get_allowed_ips_and_secret(ctx, user_info).await, #[cfg(any(test, feature = "testing"))] Self::PostgresMock(api) => api.get_allowed_ips_and_secret(ctx, user_info).await, #[cfg(test)] @@ -81,7 +76,6 @@ impl ControlPlaneApi for ControlPlaneClient { ) -> Result, errors::GetEndpointJwksError> { match self { Self::ProxyV1(api) => api.get_endpoint_jwks(ctx, endpoint).await, - Self::Neon(api) => api.get_endpoint_jwks(ctx, endpoint).await, #[cfg(any(test, feature = "testing"))] Self::PostgresMock(api) => api.get_endpoint_jwks(ctx, endpoint).await, #[cfg(test)] @@ -96,7 +90,6 @@ impl ControlPlaneApi for ControlPlaneClient { ) -> Result { match self { Self::ProxyV1(api) => api.wake_compute(ctx, user_info).await, - Self::Neon(api) => api.wake_compute(ctx, user_info).await, #[cfg(any(test, feature = "testing"))] Self::PostgresMock(api) => api.wake_compute(ctx, user_info).await, #[cfg(test)] diff --git a/proxy/src/control_plane/client/neon.rs b/proxy/src/control_plane/client/neon.rs deleted file mode 100644 index bf62c0d6ab..0000000000 --- a/proxy/src/control_plane/client/neon.rs +++ /dev/null @@ -1,511 +0,0 @@ -//! Stale console backend, remove after migrating to Proxy V1 API (#15245). - -use std::sync::Arc; -use std::time::Duration; - -use ::http::header::AUTHORIZATION; -use ::http::HeaderName; -use futures::TryFutureExt; -use postgres_client::config::SslMode; -use tokio::time::Instant; -use tracing::{debug, info, info_span, warn, Instrument}; - -use super::super::messages::{ControlPlaneErrorMessage, GetRoleSecret, WakeCompute}; -use crate::auth::backend::jwt::AuthRule; -use crate::auth::backend::ComputeUserInfo; -use crate::cache::Cached; -use crate::context::RequestContext; -use crate::control_plane::caches::ApiCaches; -use crate::control_plane::errors::{ - ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError, -}; -use crate::control_plane::locks::ApiLocks; -use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason}; -use crate::control_plane::{ - AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo, -}; -use crate::metrics::{CacheOutcome, Metrics}; -use crate::rate_limiter::WakeComputeRateLimiter; -use crate::types::{EndpointCacheKey, EndpointId}; -use crate::{compute, http, scram}; - -const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id"); - -#[derive(Clone)] -pub struct NeonControlPlaneClient { - endpoint: http::Endpoint, - pub caches: &'static ApiCaches, - pub(crate) locks: &'static ApiLocks, - pub(crate) wake_compute_endpoint_rate_limiter: Arc, - // put in a shared ref so we don't copy secrets all over in memory - jwt: Arc, -} - -impl NeonControlPlaneClient { - /// Construct an API object containing the auth parameters. - pub fn new( - endpoint: http::Endpoint, - jwt: Arc, - caches: &'static ApiCaches, - locks: &'static ApiLocks, - wake_compute_endpoint_rate_limiter: Arc, - ) -> Self { - Self { - endpoint, - caches, - locks, - wake_compute_endpoint_rate_limiter, - jwt, - } - } - - pub(crate) fn url(&self) -> &str { - self.endpoint.url().as_str() - } - - async fn do_get_auth_info( - &self, - ctx: &RequestContext, - user_info: &ComputeUserInfo, - ) -> Result { - if !self - .caches - .endpoints_cache - .is_valid(ctx, &user_info.endpoint.normalize()) - { - // TODO: refactor this because it's weird - // this is a failure to authenticate but we return Ok. - info!("endpoint is not valid, skipping the request"); - return Ok(AuthInfo::default()); - } - let request_id = ctx.session_id().to_string(); - let application_name = ctx.console_application_name(); - async { - let request = self - .endpoint - .get_path("proxy_get_role_secret") - .header(X_REQUEST_ID, &request_id) - .header(AUTHORIZATION, format!("Bearer {}", &self.jwt)) - .query(&[("session_id", ctx.session_id())]) - .query(&[ - ("application_name", application_name.as_str()), - ("project", user_info.endpoint.as_str()), - ("role", user_info.user.as_str()), - ]) - .build()?; - - debug!(url = request.url().as_str(), "sending http request"); - let start = Instant::now(); - let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane); - let response = self.endpoint.execute(request).await?; - drop(pause); - info!(duration = ?start.elapsed(), "received http response"); - let body = match parse_body::(response).await { - Ok(body) => body, - // Error 404 is special: it's ok not to have a secret. - // TODO(anna): retry - Err(e) => { - return if e.get_reason().is_not_found() { - // TODO: refactor this because it's weird - // this is a failure to authenticate but we return Ok. - Ok(AuthInfo::default()) - } else { - Err(e.into()) - }; - } - }; - - let secret = if body.role_secret.is_empty() { - None - } else { - let secret = scram::ServerSecret::parse(&body.role_secret) - .map(AuthSecret::Scram) - .ok_or(GetAuthInfoError::BadSecret)?; - Some(secret) - }; - let allowed_ips = body.allowed_ips.unwrap_or_default(); - Metrics::get() - .proxy - .allowed_ips_number - .observe(allowed_ips.len() as f64); - Ok(AuthInfo { - secret, - allowed_ips, - project_id: body.project_id, - }) - } - .inspect_err(|e| tracing::debug!(error = ?e)) - .instrument(info_span!("do_get_auth_info")) - .await - } - - async fn do_get_endpoint_jwks( - &self, - ctx: &RequestContext, - endpoint: EndpointId, - ) -> Result, GetEndpointJwksError> { - if !self - .caches - .endpoints_cache - .is_valid(ctx, &endpoint.normalize()) - { - return Err(GetEndpointJwksError::EndpointNotFound); - } - let request_id = ctx.session_id().to_string(); - async { - let request = self - .endpoint - .get_with_url(|url| { - url.path_segments_mut() - .push("endpoints") - .push(endpoint.as_str()) - .push("jwks"); - }) - .header(X_REQUEST_ID, &request_id) - .header(AUTHORIZATION, format!("Bearer {}", &self.jwt)) - .query(&[("session_id", ctx.session_id())]) - .build() - .map_err(GetEndpointJwksError::RequestBuild)?; - - debug!(url = request.url().as_str(), "sending http request"); - let start = Instant::now(); - let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane); - let response = self - .endpoint - .execute(request) - .await - .map_err(GetEndpointJwksError::RequestExecute)?; - drop(pause); - info!(duration = ?start.elapsed(), "received http response"); - - let body = parse_body::(response).await?; - - let rules = body - .jwks - .into_iter() - .map(|jwks| AuthRule { - id: jwks.id, - jwks_url: jwks.jwks_url, - audience: jwks.jwt_audience, - role_names: jwks.role_names, - }) - .collect(); - - Ok(rules) - } - .inspect_err(|e| tracing::debug!(error = ?e)) - .instrument(info_span!("do_get_endpoint_jwks")) - .await - } - - async fn do_wake_compute( - &self, - ctx: &RequestContext, - user_info: &ComputeUserInfo, - ) -> Result { - let request_id = ctx.session_id().to_string(); - let application_name = ctx.console_application_name(); - async { - let mut request_builder = self - .endpoint - .get_path("proxy_wake_compute") - .header("X-Request-ID", &request_id) - .header("Authorization", format!("Bearer {}", &self.jwt)) - .query(&[("session_id", ctx.session_id())]) - .query(&[ - ("application_name", application_name.as_str()), - ("project", user_info.endpoint.as_str()), - ]); - - let options = user_info.options.to_deep_object(); - if !options.is_empty() { - request_builder = request_builder.query(&options); - } - - let request = request_builder.build()?; - - debug!(url = request.url().as_str(), "sending http request"); - let start = Instant::now(); - let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane); - let response = self.endpoint.execute(request).await?; - drop(pause); - info!(duration = ?start.elapsed(), "received http response"); - let body = parse_body::(response).await?; - - // Unfortunately, ownership won't let us use `Option::ok_or` here. - let (host, port) = match parse_host_port(&body.address) { - None => return Err(WakeComputeError::BadComputeAddress(body.address)), - Some(x) => x, - }; - - // Don't set anything but host and port! This config will be cached. - // We'll set username and such later using the startup message. - // TODO: add more type safety (in progress). - let mut config = compute::ConnCfg::new(host.to_owned(), port); - config.ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes. - - let node = NodeInfo { - config, - aux: body.aux, - allow_self_signed_compute: false, - }; - - Ok(node) - } - .inspect_err(|e| tracing::debug!(error = ?e)) - .instrument(info_span!("do_wake_compute")) - .await - } -} - -impl super::ControlPlaneApi for NeonControlPlaneClient { - #[tracing::instrument(skip_all)] - async fn get_role_secret( - &self, - ctx: &RequestContext, - user_info: &ComputeUserInfo, - ) -> Result { - let normalized_ep = &user_info.endpoint.normalize(); - let user = &user_info.user; - if let Some(role_secret) = self - .caches - .project_info - .get_role_secret(normalized_ep, user) - { - return Ok(role_secret); - } - let auth_info = self.do_get_auth_info(ctx, user_info).await?; - if let Some(project_id) = auth_info.project_id { - let normalized_ep_int = normalized_ep.into(); - self.caches.project_info.insert_role_secret( - project_id, - normalized_ep_int, - user.into(), - auth_info.secret.clone(), - ); - self.caches.project_info.insert_allowed_ips( - project_id, - normalized_ep_int, - Arc::new(auth_info.allowed_ips), - ); - ctx.set_project_id(project_id); - } - // When we just got a secret, we don't need to invalidate it. - Ok(Cached::new_uncached(auth_info.secret)) - } - - async fn get_allowed_ips_and_secret( - &self, - ctx: &RequestContext, - user_info: &ComputeUserInfo, - ) -> Result<(CachedAllowedIps, Option), GetAuthInfoError> { - let normalized_ep = &user_info.endpoint.normalize(); - if let Some(allowed_ips) = self.caches.project_info.get_allowed_ips(normalized_ep) { - Metrics::get() - .proxy - .allowed_ips_cache_misses - .inc(CacheOutcome::Hit); - return Ok((allowed_ips, None)); - } - Metrics::get() - .proxy - .allowed_ips_cache_misses - .inc(CacheOutcome::Miss); - let auth_info = self.do_get_auth_info(ctx, user_info).await?; - let allowed_ips = Arc::new(auth_info.allowed_ips); - let user = &user_info.user; - if let Some(project_id) = auth_info.project_id { - let normalized_ep_int = normalized_ep.into(); - self.caches.project_info.insert_role_secret( - project_id, - normalized_ep_int, - user.into(), - auth_info.secret.clone(), - ); - self.caches.project_info.insert_allowed_ips( - project_id, - normalized_ep_int, - allowed_ips.clone(), - ); - ctx.set_project_id(project_id); - } - Ok(( - Cached::new_uncached(allowed_ips), - Some(Cached::new_uncached(auth_info.secret)), - )) - } - - #[tracing::instrument(skip_all)] - async fn get_endpoint_jwks( - &self, - ctx: &RequestContext, - endpoint: EndpointId, - ) -> Result, GetEndpointJwksError> { - self.do_get_endpoint_jwks(ctx, endpoint).await - } - - #[tracing::instrument(skip_all)] - async fn wake_compute( - &self, - ctx: &RequestContext, - user_info: &ComputeUserInfo, - ) -> Result { - let key = user_info.endpoint_cache_key(); - - macro_rules! check_cache { - () => { - if let Some(cached) = self.caches.node_info.get(&key) { - let (cached, info) = cached.take_value(); - let info = info.map_err(|c| { - info!(key = &*key, "found cached wake_compute error"); - WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c))) - })?; - - debug!(key = &*key, "found cached compute node info"); - ctx.set_project(info.aux.clone()); - return Ok(cached.map(|()| info)); - } - }; - } - - // Every time we do a wakeup http request, the compute node will stay up - // for some time (highly depends on the console's scale-to-zero policy); - // The connection info remains the same during that period of time, - // which means that we might cache it to reduce the load and latency. - check_cache!(); - - let permit = self.locks.get_permit(&key).await?; - - // after getting back a permit - it's possible the cache was filled - // double check - if permit.should_check_cache() { - // TODO: if there is something in the cache, mark the permit as success. - check_cache!(); - } - - // check rate limit - if !self - .wake_compute_endpoint_rate_limiter - .check(user_info.endpoint.normalize_intern(), 1) - { - return Err(WakeComputeError::TooManyConnections); - } - - let node = permit.release_result(self.do_wake_compute(ctx, user_info).await); - match node { - Ok(node) => { - ctx.set_project(node.aux.clone()); - debug!(key = &*key, "created a cache entry for woken compute node"); - - let mut stored_node = node.clone(); - // store the cached node as 'warm_cached' - stored_node.aux.cold_start_info = ColdStartInfo::WarmCached; - - let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node)); - - Ok(cached.map(|()| node)) - } - Err(err) => match err { - WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => { - let Some(status) = &err.status else { - return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message( - err, - ))); - }; - - let reason = status - .details - .error_info - .map_or(Reason::Unknown, |x| x.reason); - - // if we can retry this error, do not cache it. - if reason.can_retry() { - return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message( - err, - ))); - } - - // at this point, we should only have quota errors. - debug!( - key = &*key, - "created a cache entry for the wake compute error" - ); - - self.caches.node_info.insert_ttl( - key, - Err(err.clone()), - Duration::from_secs(30), - ); - - Err(WakeComputeError::ControlPlane(ControlPlaneError::Message( - err, - ))) - } - err => return Err(err), - }, - } - } -} - -/// Parse http response body, taking status code into account. -async fn parse_body serde::Deserialize<'a>>( - response: http::Response, -) -> Result { - let status = response.status(); - if status.is_success() { - // We shouldn't log raw body because it may contain secrets. - info!("request succeeded, processing the body"); - return Ok(response.json().await?); - } - let s = response.bytes().await?; - // Log plaintext to be able to detect, whether there are some cases not covered by the error struct. - info!("response_error plaintext: {:?}", s); - - // Don't throw an error here because it's not as important - // as the fact that the request itself has failed. - let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| { - warn!("failed to parse error body: {e}"); - ControlPlaneErrorMessage { - error: "reason unclear (malformed error message)".into(), - http_status_code: status, - status: None, - } - }); - body.http_status_code = status; - - warn!("console responded with an error ({status}): {body:?}"); - Err(ControlPlaneError::Message(Box::new(body))) -} - -fn parse_host_port(input: &str) -> Option<(&str, u16)> { - let (host, port) = input.rsplit_once(':')?; - let ipv6_brackets: &[_] = &['[', ']']; - Some((host.trim_matches(ipv6_brackets), port.parse().ok()?)) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_host_port_v4() { - let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse"); - assert_eq!(host, "127.0.0.1"); - assert_eq!(port, 5432); - } - - #[test] - fn test_parse_host_port_v6() { - let (host, port) = parse_host_port("[2001:db8::1]:5432").expect("failed to parse"); - assert_eq!(host, "2001:db8::1"); - assert_eq!(port, 5432); - } - - #[test] - fn test_parse_host_port_url() { - let (host, port) = parse_host_port("compute-foo-bar-1234.default.svc.cluster.local:5432") - .expect("failed to parse"); - assert_eq!(host, "compute-foo-bar-1234.default.svc.cluster.local"); - assert_eq!(port, 5432); - } -} diff --git a/proxy/src/control_plane/messages.rs b/proxy/src/control_plane/messages.rs index 2662ab85f9..d068614b24 100644 --- a/proxy/src/control_plane/messages.rs +++ b/proxy/src/control_plane/messages.rs @@ -221,15 +221,6 @@ pub(crate) struct UserFacingMessage { pub(crate) message: Box, } -/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`]. -/// Returned by the `/proxy_get_role_secret` API method. -#[derive(Deserialize)] -pub(crate) struct GetRoleSecret { - pub(crate) role_secret: Box, - pub(crate) allowed_ips: Option>, - pub(crate) project_id: Option, -} - /// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`]. /// Returned by the `/get_endpoint_access_control` API method. #[derive(Deserialize)] @@ -240,13 +231,6 @@ pub(crate) struct GetEndpointAccessControl { pub(crate) allowed_vpc_endpoint_ids: Option>, } -// Manually implement debug to omit sensitive info. -impl fmt::Debug for GetRoleSecret { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("GetRoleSecret").finish_non_exhaustive() - } -} - /// Response which holds compute node's `host:port` pair. /// Returned by the `/proxy_wake_compute` API method. #[derive(Debug, Deserialize)] @@ -477,18 +461,18 @@ mod tests { let json = json!({ "role_secret": "secret", }); - serde_json::from_str::(&json.to_string())?; + serde_json::from_str::(&json.to_string())?; let json = json!({ "role_secret": "secret", "allowed_ips": ["8.8.8.8"], }); - serde_json::from_str::(&json.to_string())?; + serde_json::from_str::(&json.to_string())?; let json = json!({ "role_secret": "secret", "allowed_ips": ["8.8.8.8"], "project_id": "project", }); - serde_json::from_str::(&json.to_string())?; + serde_json::from_str::(&json.to_string())?; Ok(()) } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 0b7cdec50d..13ada1361e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3222,7 +3222,7 @@ class NeonProxy(PgProtocol): *["--allow-self-signed-compute", "true"], ] - class Console(AuthBackend): + class ProxyV1(AuthBackend): def __init__(self, endpoint: str, fixed_rate_limit: int | None = None): self.endpoint = endpoint self.fixed_rate_limit = fixed_rate_limit @@ -3230,7 +3230,7 @@ class NeonProxy(PgProtocol): def extra_args(self) -> list[str]: args = [ # Console auth backend params - *["--auth-backend", "console"], + *["--auth-backend", "cplane-v1"], *["--auth-endpoint", self.endpoint], *["--sql-over-http-pool-opt-in", "false"], ] @@ -3478,13 +3478,13 @@ class NeonProxy(PgProtocol): class NeonAuthBroker: - class ControlPlane: + class ProxyV1: def __init__(self, endpoint: str): self.endpoint = endpoint def extra_args(self) -> list[str]: args = [ - *["--auth-backend", "console"], + *["--auth-backend", "cplane-v1"], *["--auth-endpoint", self.endpoint], ] return args @@ -3496,7 +3496,7 @@ class NeonAuthBroker: http_port: int, mgmt_port: int, external_http_port: int, - auth_backend: NeonAuthBroker.ControlPlane, + auth_backend: NeonAuthBroker.ProxyV1, ): self.domain = "apiauth.localtest.me" # resolves to 127.0.0.1 self.host = "127.0.0.1" @@ -3682,7 +3682,7 @@ def static_auth_broker( local_proxy_addr = f"{http2_echoserver.host}:{http2_echoserver.port}" # return local_proxy addr on ProxyWakeCompute. - httpserver.expect_request("/cplane/proxy_wake_compute").respond_with_json( + httpserver.expect_request("/cplane/wake_compute").respond_with_json( { "address": local_proxy_addr, "aux": { @@ -3722,7 +3722,7 @@ def static_auth_broker( http_port=http_port, mgmt_port=mgmt_port, external_http_port=external_http_port, - auth_backend=NeonAuthBroker.ControlPlane(httpserver.url_for("/cplane")), + auth_backend=NeonAuthBroker.ProxyV1(httpserver.url_for("/cplane")), ) as proxy: proxy.start() yield proxy