proxy: rate limit authentication by masked IPv6. (#7316)

## Problem

Many users have access to ipv6 subnets (eg a /64). That gives them 2^64
addresses to play with

## Summary of changes

Truncate the address to /64 to reduce the attack surface.

Todo:
~~Will NAT64 be an issue here? AFAIU they put the IPv4 address at the
end of the IPv6 address. By truncating we will lose all that detail.~~
It's the same problem as a host sharing IPv6 addresses between clients.
I don't think it's up to us to solve. If a customer is getting DDoSed,
then they likely need to arrange a dedicated IP with us.
This commit is contained in:
Conrad Ludgate
2024-04-16 15:16:34 +01:00
committed by GitHub
parent 926662eb7c
commit e5c50bb12b
7 changed files with 118 additions and 66 deletions

View File

@@ -2,8 +2,15 @@ mod classic;
mod hacks;
mod link;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use ipnet::{Ipv4Net, Ipv6Net};
pub use link::LinkAuthError;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_postgres::config::AuthKeys;
use tracing::{info, warn};
use crate::auth::credentials::check_peer_addr_is_in_list;
use crate::auth::validate_password_and_exchange;
@@ -16,6 +23,7 @@ use crate::intern::EndpointIdInt;
use crate::metrics::Metrics;
use crate::proxy::connect_compute::ComputeConnectBackend;
use crate::proxy::NeonOptions;
use crate::rate_limiter::{BucketRateLimiter, RateBucketInfo};
use crate::stream::Stream;
use crate::{
auth::{self, ComputeUserInfoMaybeEndpoint},
@@ -28,9 +36,6 @@ use crate::{
stream, url,
};
use crate::{scram, EndpointCacheKey, EndpointId, Normalize, RoleName};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
/// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality
pub enum MaybeOwned<'a, T> {
@@ -176,11 +181,45 @@ impl TryFrom<ComputeUserInfoMaybeEndpoint> for ComputeUserInfo {
}
}
#[derive(PartialEq, PartialOrd, Hash, Eq, Ord, Debug, Copy, Clone)]
pub struct MaskedIp(IpAddr);
impl MaskedIp {
fn new(value: IpAddr, prefix: u8) -> Self {
match value {
IpAddr::V4(v4) => Self(IpAddr::V4(
Ipv4Net::new(v4, prefix).map_or(v4, |x| x.trunc().addr()),
)),
IpAddr::V6(v6) => Self(IpAddr::V6(
Ipv6Net::new(v6, prefix).map_or(v6, |x| x.trunc().addr()),
)),
}
}
}
// This can't be just per IP because that would limit some PaaS that share IP addresses
pub type AuthRateLimiter = BucketRateLimiter<(EndpointIdInt, MaskedIp)>;
impl RateBucketInfo {
/// All of these are per endpoint-maskedip pair.
/// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus).
///
/// First bucket: 1000mcpus total per endpoint-ip pair
/// * 4096000 requests per second with 1 hash rounds.
/// * 1000 requests per second with 4096 hash rounds.
/// * 6.8 requests per second with 600000 hash rounds.
pub const DEFAULT_AUTH_SET: [Self; 3] = [
Self::new(1000 * 4096, Duration::from_secs(1)),
Self::new(600 * 4096, Duration::from_secs(60)),
Self::new(300 * 4096, Duration::from_secs(600)),
];
}
impl AuthenticationConfig {
pub fn check_rate_limit(
&self,
ctx: &mut RequestMonitoring,
config: &AuthenticationConfig,
secret: AuthSecret,
endpoint: &EndpointId,
is_cleartext: bool,
@@ -201,9 +240,13 @@ impl AuthenticationConfig {
1
};
let limit_not_exceeded = self
.rate_limiter
.check((endpoint_int, ctx.peer_addr), password_weight);
let limit_not_exceeded = self.rate_limiter.check(
(
endpoint_int,
MaskedIp::new(ctx.peer_addr, config.rate_limit_ip_subnet),
),
password_weight,
);
if !limit_not_exceeded {
warn!(
@@ -271,6 +314,7 @@ async fn auth_quirks(
let secret = match secret {
Some(secret) => config.check_rate_limit(
ctx,
config,
secret,
&info.endpoint,
unauthenticated_password.is_some() || allow_cleartext,
@@ -473,7 +517,7 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{net::IpAddr, sync::Arc, time::Duration};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
@@ -486,7 +530,7 @@ mod tests {
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use crate::{
auth::{ComputeUserInfoMaybeEndpoint, IpPattern},
auth::{backend::MaskedIp, ComputeUserInfoMaybeEndpoint, IpPattern},
config::AuthenticationConfig,
console::{
self,
@@ -495,12 +539,12 @@ mod tests {
},
context::RequestMonitoring,
proxy::NeonOptions,
rate_limiter::{AuthRateLimiter, RateBucketInfo},
rate_limiter::RateBucketInfo,
scram::ServerSecret,
stream::{PqStream, Stream},
};
use super::auth_quirks;
use super::{auth_quirks, AuthRateLimiter};
struct Auth {
ips: Vec<IpPattern>,
@@ -541,6 +585,7 @@ mod tests {
scram_protocol_timeout: std::time::Duration::from_secs(5),
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
rate_limit_ip_subnet: 64,
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {
@@ -552,6 +597,51 @@ mod tests {
}
}
#[test]
fn masked_ip() {
let ip_a = IpAddr::V4([127, 0, 0, 1].into());
let ip_b = IpAddr::V4([127, 0, 0, 2].into());
let ip_c = IpAddr::V4([192, 168, 1, 101].into());
let ip_d = IpAddr::V4([192, 168, 1, 102].into());
let ip_e = IpAddr::V6("abcd:abcd:abcd:abcd:abcd:abcd:abcd:abcd".parse().unwrap());
let ip_f = IpAddr::V6("abcd:abcd:abcd:abcd:1234:abcd:abcd:abcd".parse().unwrap());
assert_ne!(MaskedIp::new(ip_a, 64), MaskedIp::new(ip_b, 64));
assert_ne!(MaskedIp::new(ip_a, 32), MaskedIp::new(ip_b, 32));
assert_eq!(MaskedIp::new(ip_a, 30), MaskedIp::new(ip_b, 30));
assert_eq!(MaskedIp::new(ip_c, 30), MaskedIp::new(ip_d, 30));
assert_ne!(MaskedIp::new(ip_e, 128), MaskedIp::new(ip_f, 128));
assert_eq!(MaskedIp::new(ip_e, 64), MaskedIp::new(ip_f, 64));
}
#[test]
fn test_default_auth_rate_limit_set() {
// these values used to exceed u32::MAX
assert_eq!(
RateBucketInfo::DEFAULT_AUTH_SET,
[
RateBucketInfo {
interval: Duration::from_secs(1),
max_rpi: 1000 * 4096,
},
RateBucketInfo {
interval: Duration::from_secs(60),
max_rpi: 600 * 4096 * 60,
},
RateBucketInfo {
interval: Duration::from_secs(600),
max_rpi: 300 * 4096 * 600,
}
]
);
for x in RateBucketInfo::DEFAULT_AUTH_SET {
let y = x.to_string().parse().unwrap();
assert_eq!(x, y);
}
}
#[tokio::test]
async fn auth_quirks_scram() {
let (mut client, server) = tokio::io::duplex(1024);

View File

@@ -7,6 +7,7 @@ use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::AuthRateLimiter;
use proxy::auth::backend::MaybeOwned;
use proxy::cancellation::CancelMap;
use proxy::cancellation::CancellationHandler;
@@ -20,7 +21,6 @@ use proxy::context::parquet::ParquetUploadArgs;
use proxy::http;
use proxy::http::health_server::AppMetrics;
use proxy::metrics::Metrics;
use proxy::rate_limiter::AuthRateLimiter;
use proxy::rate_limiter::EndpointRateLimiter;
use proxy::rate_limiter::RateBucketInfo;
use proxy::rate_limiter::RateLimiterConfig;
@@ -152,6 +152,9 @@ struct ProxyCliArgs {
/// Authentication rate limiter max number of hashes per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_AUTH_SET)]
auth_rate_limit: Vec<RateBucketInfo>,
/// The IP subnet to use when considering whether two IP addresses are considered the same.
#[clap(long, default_value_t = 64)]
auth_rate_limit_ip_subnet: u8,
/// Redis rate limiter max number of requests per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
@@ -575,6 +578,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
scram_protocol_timeout: args.scram_protocol_timeout,
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
};
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();

View File

@@ -1,6 +1,6 @@
use crate::{
auth,
rate_limiter::{AuthRateLimiter, RateBucketInfo},
auth::{self, backend::AuthRateLimiter},
rate_limiter::RateBucketInfo,
serverless::GlobalConnPoolOptions,
};
use anyhow::{bail, ensure, Context, Ok};
@@ -58,6 +58,7 @@ pub struct AuthenticationConfig {
pub scram_protocol_timeout: tokio::time::Duration,
pub rate_limiter_enabled: bool,
pub rate_limiter: AuthRateLimiter,
pub rate_limit_ip_subnet: u8,
}
impl TlsConfig {

View File

@@ -4,4 +4,4 @@ mod limiter;
pub use aimd::Aimd;
pub use limit_algorithm::{AimdConfig, Fixed, RateLimitAlgorithm, RateLimiterConfig};
pub use limiter::Limiter;
pub use limiter::{AuthRateLimiter, EndpointRateLimiter, GlobalRateLimiter, RateBucketInfo};
pub use limiter::{BucketRateLimiter, EndpointRateLimiter, GlobalRateLimiter, RateBucketInfo};

View File

@@ -2,7 +2,6 @@ use std::{
borrow::Cow,
collections::hash_map::RandomState,
hash::{BuildHasher, Hash},
net::IpAddr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
@@ -18,11 +17,8 @@ use tokio::time::{timeout, Duration, Instant};
use tracing::info;
use crate::{
intern::EndpointIdInt,
{
metrics::{Metrics, RateLimit},
EndpointId,
},
metrics::{Metrics, RateLimit},
EndpointId,
};
use super::{
@@ -81,9 +77,6 @@ impl GlobalRateLimiter {
// I went with a more expensive way that yields user-friendlier error messages.
pub type EndpointRateLimiter = BucketRateLimiter<EndpointId, StdRng, RandomState>;
// This can't be just per IP because that would limit some PaaS that share IP addresses
pub type AuthRateLimiter = BucketRateLimiter<(EndpointIdInt, IpAddr), StdRng, RandomState>;
pub struct BucketRateLimiter<Key, Rand = StdRng, Hasher = RandomState> {
map: DashMap<Key, Vec<RateBucket>, Hasher>,
info: Cow<'static, [RateBucketInfo]>,
@@ -155,19 +148,6 @@ impl RateBucketInfo {
Self::new(100, Duration::from_secs(600)),
];
/// All of these are per endpoint-ip pair.
/// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus).
///
/// First bucket: 300mcpus total per endpoint-ip pair
/// * 1228800 requests per second with 1 hash rounds. (endpoint rate limiter will catch this first)
/// * 300 requests per second with 4096 hash rounds.
/// * 2 requests per second with 600000 hash rounds.
pub const DEFAULT_AUTH_SET: [Self; 3] = [
Self::new(300 * 4096, Duration::from_secs(1)),
Self::new(200 * 4096, Duration::from_secs(60)),
Self::new(100 * 4096, Duration::from_secs(600)),
];
pub fn validate(info: &mut [Self]) -> anyhow::Result<()> {
info.sort_unstable_by_key(|info| info.interval);
let invalid = info
@@ -783,31 +763,4 @@ mod tests {
}
assert!(limiter.map.len() < 150_000);
}
#[test]
fn test_default_auth_set() {
// these values used to exceed u32::MAX
assert_eq!(
RateBucketInfo::DEFAULT_AUTH_SET,
[
RateBucketInfo {
interval: Duration::from_secs(1),
max_rpi: 300 * 4096,
},
RateBucketInfo {
interval: Duration::from_secs(60),
max_rpi: 200 * 4096 * 60,
},
RateBucketInfo {
interval: Duration::from_secs(600),
max_rpi: 100 * 4096 * 600,
}
]
);
for x in RateBucketInfo::DEFAULT_AUTH_SET {
let y = x.to_string().parse().unwrap();
assert_eq!(x, y);
}
}
}

View File

@@ -6,7 +6,7 @@ use tracing::{field::display, info};
use crate::{
auth::{backend::ComputeCredentials, check_peer_addr_is_in_list, AuthError},
compute,
config::ProxyConfig,
config::{AuthenticationConfig, ProxyConfig},
console::{
errors::{GetAuthInfoError, WakeComputeError},
CachedNodeInfo,
@@ -27,6 +27,7 @@ impl PoolingBackend {
pub async fn authenticate(
&self,
ctx: &mut RequestMonitoring,
config: &AuthenticationConfig,
conn_info: &ConnInfo,
) -> Result<ComputeCredentials, AuthError> {
let user_info = conn_info.user_info.clone();
@@ -43,6 +44,7 @@ impl PoolingBackend {
let secret = match cached_secret.value.clone() {
Some(secret) => self.config.authentication_config.check_rate_limit(
ctx,
config,
secret,
&user_info.endpoint,
true,

View File

@@ -541,7 +541,9 @@ async fn handle_inner(
.map_err(SqlOverHttpError::from);
let authenticate_and_connect = async {
let keys = backend.authenticate(ctx, &conn_info).await?;
let keys = backend
.authenticate(ctx, &config.authentication_config, &conn_info)
.await?;
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await?;