mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
[proxy] Cache GetEndpointAccessControl errors (#12571)
Related to https://github.com/neondatabase/cloud/issues/19353
This commit is contained in:
committed by
GitHub
parent
8e95455aef
commit
96bcfba79e
@@ -68,6 +68,66 @@ impl NeonControlPlaneClient {
|
||||
self.endpoint.url().as_str()
|
||||
}
|
||||
|
||||
async fn get_and_cache_auth_info<T>(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
endpoint: &EndpointId,
|
||||
role: &RoleName,
|
||||
cache_key: &EndpointId,
|
||||
extract: impl FnOnce(&EndpointAccessControl, &RoleAccessControl) -> T,
|
||||
) -> Result<T, GetAuthInfoError> {
|
||||
match self.do_get_auth_req(ctx, endpoint, role).await {
|
||||
Ok(auth_info) => {
|
||||
let control = EndpointAccessControl {
|
||||
allowed_ips: Arc::new(auth_info.allowed_ips),
|
||||
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
|
||||
flags: auth_info.access_blocker_flags,
|
||||
rate_limits: auth_info.rate_limits,
|
||||
};
|
||||
let role_control = RoleAccessControl {
|
||||
secret: auth_info.secret,
|
||||
};
|
||||
let res = extract(&control, &role_control);
|
||||
|
||||
self.caches.project_info.insert_endpoint_access(
|
||||
auth_info.account_id,
|
||||
auth_info.project_id,
|
||||
cache_key.into(),
|
||||
role.into(),
|
||||
control,
|
||||
role_control,
|
||||
);
|
||||
|
||||
if let Some(project_id) = auth_info.project_id {
|
||||
ctx.set_project_id(project_id);
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
Err(err) => match err {
|
||||
GetAuthInfoError::ApiError(ControlPlaneError::Message(ref msg)) => {
|
||||
let retry_info = msg.status.as_ref().and_then(|s| s.details.retry_info);
|
||||
|
||||
// If we can retry this error, do not cache it,
|
||||
// unless we were given a retry delay.
|
||||
if msg.could_retry() && retry_info.is_none() {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
self.caches.project_info.insert_endpoint_access_err(
|
||||
cache_key.into(),
|
||||
role.into(),
|
||||
msg.clone(),
|
||||
retry_info.map(|r| Duration::from_millis(r.retry_delay_ms)),
|
||||
);
|
||||
|
||||
Err(err)
|
||||
}
|
||||
err => Err(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_get_auth_req(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
@@ -284,43 +344,34 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
ctx: &RequestContext,
|
||||
endpoint: &EndpointId,
|
||||
role: &RoleName,
|
||||
) -> Result<RoleAccessControl, crate::control_plane::errors::GetAuthInfoError> {
|
||||
let normalized_ep = &endpoint.normalize();
|
||||
if let Some(secret) = self
|
||||
) -> Result<RoleAccessControl, GetAuthInfoError> {
|
||||
let key = endpoint.normalize();
|
||||
|
||||
if let Some((role_control, ttl)) = self
|
||||
.caches
|
||||
.project_info
|
||||
.get_role_secret(normalized_ep, role)
|
||||
.get_role_secret_with_ttl(&key, role)
|
||||
{
|
||||
return Ok(secret);
|
||||
return match role_control {
|
||||
Err(mut msg) => {
|
||||
info!(key = &*key, "found cached get_role_access_control error");
|
||||
|
||||
// if retry_delay_ms is set change it to the remaining TTL
|
||||
replace_retry_delay_ms(&mut msg, |_| ttl.as_millis() as u64);
|
||||
|
||||
Err(GetAuthInfoError::ApiError(ControlPlaneError::Message(msg)))
|
||||
}
|
||||
Ok(role_control) => {
|
||||
debug!(key = &*key, "found cached role access control");
|
||||
Ok(role_control)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let auth_info = self.do_get_auth_req(ctx, endpoint, role).await?;
|
||||
|
||||
let control = EndpointAccessControl {
|
||||
allowed_ips: Arc::new(auth_info.allowed_ips),
|
||||
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
|
||||
flags: auth_info.access_blocker_flags,
|
||||
rate_limits: auth_info.rate_limits,
|
||||
};
|
||||
let role_control = RoleAccessControl {
|
||||
secret: auth_info.secret,
|
||||
};
|
||||
|
||||
if let Some(project_id) = auth_info.project_id {
|
||||
let normalized_ep_int = normalized_ep.into();
|
||||
|
||||
self.caches.project_info.insert_endpoint_access(
|
||||
auth_info.account_id,
|
||||
project_id,
|
||||
normalized_ep_int,
|
||||
role.into(),
|
||||
control,
|
||||
role_control.clone(),
|
||||
);
|
||||
ctx.set_project_id(project_id);
|
||||
}
|
||||
|
||||
Ok(role_control)
|
||||
self.get_and_cache_auth_info(ctx, endpoint, role, &key, |_, role_control| {
|
||||
role_control.clone()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -330,38 +381,30 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
endpoint: &EndpointId,
|
||||
role: &RoleName,
|
||||
) -> Result<EndpointAccessControl, GetAuthInfoError> {
|
||||
let normalized_ep = &endpoint.normalize();
|
||||
if let Some(control) = self.caches.project_info.get_endpoint_access(normalized_ep) {
|
||||
return Ok(control);
|
||||
let key = endpoint.normalize();
|
||||
|
||||
if let Some((control, ttl)) = self.caches.project_info.get_endpoint_access_with_ttl(&key) {
|
||||
return match control {
|
||||
Err(mut msg) => {
|
||||
info!(
|
||||
key = &*key,
|
||||
"found cached get_endpoint_access_control error"
|
||||
);
|
||||
|
||||
// if retry_delay_ms is set change it to the remaining TTL
|
||||
replace_retry_delay_ms(&mut msg, |_| ttl.as_millis() as u64);
|
||||
|
||||
Err(GetAuthInfoError::ApiError(ControlPlaneError::Message(msg)))
|
||||
}
|
||||
Ok(control) => {
|
||||
debug!(key = &*key, "found cached endpoint access control");
|
||||
Ok(control)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let auth_info = self.do_get_auth_req(ctx, endpoint, role).await?;
|
||||
|
||||
let control = EndpointAccessControl {
|
||||
allowed_ips: Arc::new(auth_info.allowed_ips),
|
||||
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
|
||||
flags: auth_info.access_blocker_flags,
|
||||
rate_limits: auth_info.rate_limits,
|
||||
};
|
||||
let role_control = RoleAccessControl {
|
||||
secret: auth_info.secret,
|
||||
};
|
||||
|
||||
if let Some(project_id) = auth_info.project_id {
|
||||
let normalized_ep_int = normalized_ep.into();
|
||||
|
||||
self.caches.project_info.insert_endpoint_access(
|
||||
auth_info.account_id,
|
||||
project_id,
|
||||
normalized_ep_int,
|
||||
role.into(),
|
||||
control.clone(),
|
||||
role_control,
|
||||
);
|
||||
ctx.set_project_id(project_id);
|
||||
}
|
||||
|
||||
Ok(control)
|
||||
self.get_and_cache_auth_info(ctx, endpoint, role, &key, |control, _| control.clone())
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -390,13 +433,9 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
info!(key = &*key, "found cached wake_compute error");
|
||||
|
||||
// if retry_delay_ms is set, reduce it by the amount of time it spent in cache
|
||||
if let Some(status) = &mut msg.status {
|
||||
if let Some(retry_info) = &mut status.details.retry_info {
|
||||
retry_info.retry_delay_ms = retry_info
|
||||
.retry_delay_ms
|
||||
.saturating_sub(created_at.elapsed().as_millis() as u64)
|
||||
}
|
||||
}
|
||||
replace_retry_delay_ms(&mut msg, |delay| {
|
||||
delay.saturating_sub(created_at.elapsed().as_millis() as u64)
|
||||
});
|
||||
|
||||
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
|
||||
msg,
|
||||
@@ -478,6 +517,14 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn replace_retry_delay_ms(msg: &mut ControlPlaneErrorMessage, f: impl FnOnce(u64) -> u64) {
|
||||
if let Some(status) = &mut msg.status
|
||||
&& let Some(retry_info) = &mut status.details.retry_info
|
||||
{
|
||||
retry_info.retry_delay_ms = f(retry_info.retry_delay_ms);
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse http response body, taking status code into account.
|
||||
fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
status: StatusCode,
|
||||
|
||||
@@ -52,7 +52,7 @@ impl ReportableError for ControlPlaneError {
|
||||
| Reason::EndpointNotFound
|
||||
| Reason::EndpointDisabled
|
||||
| Reason::BranchNotFound
|
||||
| Reason::InvalidEphemeralEndpointOptions => ErrorKind::User,
|
||||
| Reason::WrongLsnOrTimestamp => ErrorKind::User,
|
||||
|
||||
Reason::RateLimitExceeded => ErrorKind::ServiceRateLimit,
|
||||
|
||||
|
||||
@@ -107,7 +107,7 @@ pub(crate) struct ErrorInfo {
|
||||
// Schema could also have `metadata` field, but it's not structured. Skip it for now.
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Default)]
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Default, PartialEq, Eq)]
|
||||
pub(crate) enum Reason {
|
||||
/// RoleProtected indicates that the role is protected and the attempted operation is not permitted on protected roles.
|
||||
#[serde(rename = "ROLE_PROTECTED")]
|
||||
@@ -133,9 +133,9 @@ pub(crate) enum Reason {
|
||||
/// or that the subject doesn't have enough permissions to access the requested branch.
|
||||
#[serde(rename = "BRANCH_NOT_FOUND")]
|
||||
BranchNotFound,
|
||||
/// InvalidEphemeralEndpointOptions indicates that the specified LSN or timestamp are wrong.
|
||||
#[serde(rename = "INVALID_EPHEMERAL_OPTIONS")]
|
||||
InvalidEphemeralEndpointOptions,
|
||||
/// WrongLsnOrTimestamp indicates that the specified LSN or timestamp are wrong.
|
||||
#[serde(rename = "WRONG_LSN_OR_TIMESTAMP")]
|
||||
WrongLsnOrTimestamp,
|
||||
/// RateLimitExceeded indicates that the rate limit for the operation has been exceeded.
|
||||
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
|
||||
RateLimitExceeded,
|
||||
@@ -205,7 +205,7 @@ impl Reason {
|
||||
| Reason::EndpointNotFound
|
||||
| Reason::EndpointDisabled
|
||||
| Reason::BranchNotFound
|
||||
| Reason::InvalidEphemeralEndpointOptions => false,
|
||||
| Reason::WrongLsnOrTimestamp => false,
|
||||
// we were asked to go away
|
||||
Reason::RateLimitExceeded
|
||||
| Reason::NonDefaultBranchComputeTimeExceeded
|
||||
@@ -257,19 +257,19 @@ pub(crate) struct GetEndpointAccessControl {
|
||||
pub(crate) rate_limits: EndpointRateLimitConfig,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, Default)]
|
||||
#[derive(Copy, Clone, Deserialize, Default, Debug)]
|
||||
pub struct EndpointRateLimitConfig {
|
||||
pub connection_attempts: ConnectionAttemptsLimit,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, Default)]
|
||||
#[derive(Copy, Clone, Deserialize, Default, Debug)]
|
||||
pub struct ConnectionAttemptsLimit {
|
||||
pub tcp: Option<LeakyBucketSetting>,
|
||||
pub ws: Option<LeakyBucketSetting>,
|
||||
pub http: Option<LeakyBucketSetting>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize)]
|
||||
#[derive(Copy, Clone, Deserialize, Debug)]
|
||||
pub struct LeakyBucketSetting {
|
||||
pub rps: f64,
|
||||
pub burst: f64,
|
||||
|
||||
@@ -82,7 +82,7 @@ impl NodeInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Default)]
|
||||
#[derive(Copy, Clone, Default, Debug)]
|
||||
pub(crate) struct AccessBlockerFlags {
|
||||
pub public_access_blocked: bool,
|
||||
pub vpc_access_blocked: bool,
|
||||
@@ -92,12 +92,12 @@ pub(crate) type NodeInfoCache =
|
||||
TimedLru<EndpointCacheKey, Result<NodeInfo, Box<ControlPlaneErrorMessage>>>;
|
||||
pub(crate) type CachedNodeInfo = Cached<&'static NodeInfoCache, NodeInfo>;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RoleAccessControl {
|
||||
pub secret: Option<AuthSecret>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EndpointAccessControl {
|
||||
pub allowed_ips: Arc<Vec<IpPattern>>,
|
||||
pub allowed_vpce: Arc<Vec<String>>,
|
||||
|
||||
Reference in New Issue
Block a user