From 3b1ac8b14a0f48c71780f3cb3d607ee7287093f7 Mon Sep 17 00:00:00 2001 From: Ivan Efremov Date: Fri, 22 Nov 2024 18:46:38 +0200 Subject: [PATCH] proxy: Implement cancellation rate limiting (#9739) Implement cancellation rate limiting and ip allowlist checks. Add ip_allowlist to the cancel closure Fixes [#16456](https://github.com/neondatabase/cloud/issues/16456) --- Cargo.lock | 4 +- Cargo.toml | 2 +- proxy/src/auth/backend/console_redirect.rs | 20 +++-- proxy/src/auth/backend/mod.rs | 18 +---- proxy/src/bin/proxy.rs | 3 +- proxy/src/cancellation.rs | 87 +++++++++++++++++++++- proxy/src/compute.rs | 2 +- proxy/src/console_redirect_proxy.rs | 13 +++- proxy/src/metrics.rs | 1 + proxy/src/proxy/mod.rs | 8 +- proxy/src/rate_limiter/limiter.rs | 19 ++++- proxy/src/rate_limiter/mod.rs | 3 +- proxy/src/redis/cancellation_publisher.rs | 27 +++++-- proxy/src/redis/notifications.rs | 15 +++- 14 files changed, 173 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1232a6b6a..a25fa89c77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2838,9 +2838,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.9.0" +version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" [[package]] name = "is-terminal" diff --git a/Cargo.toml b/Cargo.toml index c6b4b62042..aac19a4122 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,7 @@ hyper-util = "0.1" tokio-tungstenite = "0.21.0" indexmap = "2" indoc = "2" -ipnet = "2.9.0" +ipnet = "2.10.0" itertools = "0.10" itoa = "1.0.11" jsonwebtoken = "9" diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index 5772471486..bf7a1cb070 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -6,6 +6,7 @@ use tokio_postgres::config::SslMode; use tracing::{info, info_span}; use super::ComputeCredentialKeys; +use crate::auth::IpPattern; use crate::cache::Cached; use crate::config::AuthenticationConfig; use crate::context::RequestContext; @@ -74,10 +75,10 @@ impl ConsoleRedirectBackend { ctx: &RequestContext, auth_config: &'static AuthenticationConfig, client: &mut PqStream, - ) -> auth::Result { + ) -> auth::Result<(ConsoleRedirectNodeInfo, Option>)> { authenticate(ctx, auth_config, &self.console_uri, client) .await - .map(ConsoleRedirectNodeInfo) + .map(|(node_info, ip_allowlist)| (ConsoleRedirectNodeInfo(node_info), ip_allowlist)) } } @@ -102,7 +103,7 @@ async fn authenticate( auth_config: &'static AuthenticationConfig, link_uri: &reqwest::Url, client: &mut PqStream, -) -> auth::Result { +) -> auth::Result<(NodeInfo, Option>)> { ctx.set_auth_method(crate::context::AuthMethod::ConsoleRedirect); // registering waiter can fail if we get unlucky with rng. @@ -176,9 +177,12 @@ async fn authenticate( config.password(password.as_ref()); } - Ok(NodeInfo { - config, - aux: db_info.aux, - allow_self_signed_compute: false, // caller may override - }) + Ok(( + NodeInfo { + config, + aux: db_info.aux, + allow_self_signed_compute: false, // caller may override + }, + db_info.allowed_ips, + )) } diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index 57ecd5e499..7e1b26a11a 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -6,7 +6,6 @@ pub mod local; use std::net::IpAddr; use std::sync::Arc; -use std::time::Duration; pub use console_redirect::ConsoleRedirectBackend; pub(crate) use console_redirect::ConsoleRedirectError; @@ -30,7 +29,7 @@ use crate::intern::EndpointIdInt; use crate::metrics::Metrics; use crate::proxy::connect_compute::ComputeConnectBackend; use crate::proxy::NeonOptions; -use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter, RateBucketInfo}; +use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter}; use crate::stream::Stream; use crate::types::{EndpointCacheKey, EndpointId, RoleName}; use crate::{scram, stream}; @@ -192,21 +191,6 @@ impl MaskedIp { // This can't be just per IP because that would limit some PaaS that share IP addresses pub type AuthRateLimiter = BucketRateLimiter<(EndpointIdInt, MaskedIp)>; -impl RateBucketInfo { - /// All of these are per endpoint-maskedip pair. - /// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus). - /// - /// First bucket: 1000mcpus total per endpoint-ip pair - /// * 4096000 requests per second with 1 hash rounds. - /// * 1000 requests per second with 4096 hash rounds. - /// * 6.8 requests per second with 600000 hash rounds. - pub const DEFAULT_AUTH_SET: [Self; 3] = [ - Self::new(1000 * 4096, Duration::from_secs(1)), - Self::new(600 * 4096, Duration::from_secs(60)), - Self::new(300 * 4096, Duration::from_secs(600)), - ]; -} - impl AuthenticationConfig { pub(crate) fn check_rate_limit( &self, diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 45fbe4a398..a935378162 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -428,8 +428,9 @@ async fn main() -> anyhow::Result<()> { )?))), None => None, }; + let cancellation_handler = Arc::new(CancellationHandler::< - Option>>, + Option>>, >::new( cancel_map.clone(), redis_publisher, diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 3ad2d55b53..4b72a66e63 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -10,16 +10,23 @@ use tokio_postgres::{CancelToken, NoTls}; use tracing::{debug, info}; use uuid::Uuid; +use crate::auth::{check_peer_addr_is_in_list, IpPattern}; use crate::error::ReportableError; use crate::metrics::{CancellationRequest, CancellationSource, Metrics}; +use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::cancellation_publisher::{ CancellationPublisher, CancellationPublisherMut, RedisPublisherClient, }; +use std::net::IpAddr; + +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; pub type CancelMap = Arc>>; pub type CancellationHandlerMain = CancellationHandler>>>; pub(crate) type CancellationHandlerMainInternal = Option>>; +type IpSubnetKey = IpNet; + /// Enables serving `CancelRequest`s. /// /// If `CancellationPublisher` is available, cancel request will be used to publish the cancellation key to other proxy instances. @@ -29,14 +36,23 @@ pub struct CancellationHandler

{ /// This field used for the monitoring purposes. /// Represents the source of the cancellation request. from: CancellationSource, + // rate limiter of cancellation requests + limiter: Arc>>, } #[derive(Debug, Error)] pub(crate) enum CancelError { #[error("{0}")] IO(#[from] std::io::Error), + #[error("{0}")] Postgres(#[from] tokio_postgres::Error), + + #[error("rate limit exceeded")] + RateLimit, + + #[error("IP is not allowed")] + IpNotAllowed, } impl ReportableError for CancelError { @@ -47,6 +63,8 @@ impl ReportableError for CancelError { crate::error::ErrorKind::Postgres } CancelError::Postgres(_) => crate::error::ErrorKind::Compute, + CancelError::RateLimit => crate::error::ErrorKind::RateLimit, + CancelError::IpNotAllowed => crate::error::ErrorKind::User, } } } @@ -79,13 +97,36 @@ impl CancellationHandler

{ cancellation_handler: self, } } + /// Try to cancel a running query for the corresponding connection. /// If the cancellation key is not found, it will be published to Redis. + /// check_allowed - if true, check if the IP is allowed to cancel the query pub(crate) async fn cancel_session( &self, key: CancelKeyData, session_id: Uuid, + peer_addr: &IpAddr, + check_allowed: bool, ) -> Result<(), CancelError> { + // TODO: check for unspecified address is only for backward compatibility, should be removed + if !peer_addr.is_unspecified() { + let subnet_key = match *peer_addr { + IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here + IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()), + }; + if !self.limiter.lock().unwrap().check(subnet_key, 1) { + tracing::debug!("Rate limit exceeded. Skipping cancellation message"); + Metrics::get() + .proxy + .cancellation_requests_total + .inc(CancellationRequest { + source: self.from, + kind: crate::metrics::CancellationOutcome::RateLimitExceeded, + }); + return Err(CancelError::RateLimit); + } + } + // NB: we should immediately release the lock after cloning the token. let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else { tracing::warn!("query cancellation key not found: {key}"); @@ -96,7 +137,13 @@ impl CancellationHandler

{ source: self.from, kind: crate::metrics::CancellationOutcome::NotFound, }); - match self.client.try_publish(key, session_id).await { + + if session_id == Uuid::nil() { + // was already published, do not publish it again + return Ok(()); + } + + match self.client.try_publish(key, session_id, *peer_addr).await { Ok(()) => {} // do nothing Err(e) => { return Err(CancelError::IO(std::io::Error::new( @@ -107,6 +154,13 @@ impl CancellationHandler

{ } return Ok(()); }; + + if check_allowed + && !check_peer_addr_is_in_list(peer_addr, cancel_closure.ip_allowlist.as_slice()) + { + return Err(CancelError::IpNotAllowed); + } + Metrics::get() .proxy .cancellation_requests_total @@ -135,13 +189,29 @@ impl CancellationHandler<()> { map, client: (), from, + limiter: Arc::new(std::sync::Mutex::new( + LeakyBucketRateLimiter::::new_with_shards( + LeakyBucketRateLimiter::::DEFAULT, + 64, + ), + )), } } } impl CancellationHandler>>> { pub fn new(map: CancelMap, client: Option>>, from: CancellationSource) -> Self { - Self { map, client, from } + Self { + map, + client, + from, + limiter: Arc::new(std::sync::Mutex::new( + LeakyBucketRateLimiter::::new_with_shards( + LeakyBucketRateLimiter::::DEFAULT, + 64, + ), + )), + } } } @@ -152,13 +222,19 @@ impl CancellationHandler>>> { pub struct CancelClosure { socket_addr: SocketAddr, cancel_token: CancelToken, + ip_allowlist: Vec, } impl CancelClosure { - pub(crate) fn new(socket_addr: SocketAddr, cancel_token: CancelToken) -> Self { + pub(crate) fn new( + socket_addr: SocketAddr, + cancel_token: CancelToken, + ip_allowlist: Vec, + ) -> Self { Self { socket_addr, cancel_token, + ip_allowlist, } } /// Cancels the query running on user's compute node. @@ -168,6 +244,9 @@ impl CancelClosure { debug!("query was cancelled"); Ok(()) } + pub(crate) fn set_ip_allowlist(&mut self, ip_allowlist: Vec) { + self.ip_allowlist = ip_allowlist; + } } /// Helper for registering query cancellation tokens. @@ -229,6 +308,8 @@ mod tests { cancel_key: 0, }, Uuid::new_v4(), + &("127.0.0.1".parse().unwrap()), + true, ) .await .unwrap(); diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index e7fbe9ab47..8408d4720b 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -342,7 +342,7 @@ impl ConnCfg { // NB: CancelToken is supposed to hold socket_addr, but we use connect_raw. // Yet another reason to rework the connection establishing code. - let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token()); + let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token(), vec![]); let connection = PostgresConnection { stream, diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index c88b2936db..fbd0c8e5c5 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -156,16 +156,21 @@ pub(crate) async fn handle_client( let request_gauge = metrics.connection_requests.guard(proto); let tls = config.tls_config.as_ref(); - let record_handshake_error = !ctx.has_private_peer_addr(); let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client); let do_handshake = handshake(ctx, stream, tls, record_handshake_error); + let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? { HandshakeData::Startup(stream, params) => (stream, params), HandshakeData::Cancel(cancel_key_data) => { return Ok(cancellation_handler - .cancel_session(cancel_key_data, ctx.session_id()) + .cancel_session( + cancel_key_data, + ctx.session_id(), + &ctx.peer_addr(), + config.authentication_config.ip_allowlist_check_enabled, + ) .await .map(|()| None)?) } @@ -174,7 +179,7 @@ pub(crate) async fn handle_client( ctx.set_db_options(params.clone()); - let user_info = match backend + let (user_info, ip_allowlist) = match backend .authenticate(ctx, &config.authentication_config, &mut stream) .await { @@ -198,6 +203,8 @@ pub(crate) async fn handle_client( .or_else(|e| stream.throw_error(e)) .await?; + node.cancel_closure + .set_ip_allowlist(ip_allowlist.unwrap_or_default()); let session = cancellation_handler.get_session(); prepare_client_connection(&node, &session, &mut stream).await?; diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index f91fcd4120..659c57c865 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -351,6 +351,7 @@ pub enum CancellationSource { pub enum CancellationOutcome { NotFound, Found, + RateLimitExceeded, } #[derive(LabelGroup)] diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 9415b54a4a..5d9468d89a 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -268,12 +268,18 @@ pub(crate) async fn handle_client( let record_handshake_error = !ctx.has_private_peer_addr(); let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client); let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error); + let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? { HandshakeData::Startup(stream, params) => (stream, params), HandshakeData::Cancel(cancel_key_data) => { return Ok(cancellation_handler - .cancel_session(cancel_key_data, ctx.session_id()) + .cancel_session( + cancel_key_data, + ctx.session_id(), + &ctx.peer_addr(), + config.authentication_config.ip_allowlist_check_enabled, + ) .await .map(|()| None)?) } diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index 4259fd04f4..a048721e77 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -14,13 +14,13 @@ use tracing::info; use crate::intern::EndpointIdInt; -pub(crate) struct GlobalRateLimiter { +pub struct GlobalRateLimiter { data: Vec, info: Vec, } impl GlobalRateLimiter { - pub(crate) fn new(info: Vec) -> Self { + pub fn new(info: Vec) -> Self { Self { data: vec![ RateBucket { @@ -34,7 +34,7 @@ impl GlobalRateLimiter { } /// Check that number of connections is below `max_rps` rps. - pub(crate) fn check(&mut self) -> bool { + pub fn check(&mut self) -> bool { let now = Instant::now(); let should_allow_request = self @@ -137,6 +137,19 @@ impl RateBucketInfo { Self::new(200, Duration::from_secs(600)), ]; + /// All of these are per endpoint-maskedip pair. + /// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus). + /// + /// First bucket: 1000mcpus total per endpoint-ip pair + /// * 4096000 requests per second with 1 hash rounds. + /// * 1000 requests per second with 4096 hash rounds. + /// * 6.8 requests per second with 600000 hash rounds. + pub const DEFAULT_AUTH_SET: [Self; 3] = [ + Self::new(1000 * 4096, Duration::from_secs(1)), + Self::new(600 * 4096, Duration::from_secs(60)), + Self::new(300 * 4096, Duration::from_secs(600)), + ]; + pub fn rps(&self) -> f64 { (self.max_rpi as f64) / self.interval.as_secs_f64() } diff --git a/proxy/src/rate_limiter/mod.rs b/proxy/src/rate_limiter/mod.rs index 3ae2ecaf8f..5f90102da3 100644 --- a/proxy/src/rate_limiter/mod.rs +++ b/proxy/src/rate_limiter/mod.rs @@ -8,5 +8,4 @@ pub(crate) use limit_algorithm::aimd::Aimd; pub(crate) use limit_algorithm::{ DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token, }; -pub(crate) use limiter::GlobalRateLimiter; -pub use limiter::{BucketRateLimiter, RateBucketInfo, WakeComputeRateLimiter}; +pub use limiter::{BucketRateLimiter, GlobalRateLimiter, RateBucketInfo, WakeComputeRateLimiter}; diff --git a/proxy/src/redis/cancellation_publisher.rs b/proxy/src/redis/cancellation_publisher.rs index 7392b0d316..633a2f1b81 100644 --- a/proxy/src/redis/cancellation_publisher.rs +++ b/proxy/src/redis/cancellation_publisher.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use core::net::IpAddr; use pq_proto::CancelKeyData; use redis::AsyncCommands; use tokio::sync::Mutex; @@ -15,6 +16,7 @@ pub trait CancellationPublisherMut: Send + Sync + 'static { &mut self, cancel_key_data: CancelKeyData, session_id: Uuid, + peer_addr: IpAddr, ) -> anyhow::Result<()>; } @@ -24,6 +26,7 @@ pub trait CancellationPublisher: Send + Sync + 'static { &self, cancel_key_data: CancelKeyData, session_id: Uuid, + peer_addr: IpAddr, ) -> anyhow::Result<()>; } @@ -32,6 +35,7 @@ impl CancellationPublisher for () { &self, _cancel_key_data: CancelKeyData, _session_id: Uuid, + _peer_addr: IpAddr, ) -> anyhow::Result<()> { Ok(()) } @@ -42,8 +46,10 @@ impl CancellationPublisherMut for P { &mut self, cancel_key_data: CancelKeyData, session_id: Uuid, + peer_addr: IpAddr, ) -> anyhow::Result<()> { -

::try_publish(self, cancel_key_data, session_id).await +

::try_publish(self, cancel_key_data, session_id, peer_addr) + .await } } @@ -52,9 +58,10 @@ impl CancellationPublisher for Option

{ &self, cancel_key_data: CancelKeyData, session_id: Uuid, + peer_addr: IpAddr, ) -> anyhow::Result<()> { if let Some(p) = self { - p.try_publish(cancel_key_data, session_id).await + p.try_publish(cancel_key_data, session_id, peer_addr).await } else { Ok(()) } @@ -66,10 +73,11 @@ impl CancellationPublisher for Arc> { &self, cancel_key_data: CancelKeyData, session_id: Uuid, + peer_addr: IpAddr, ) -> anyhow::Result<()> { self.lock() .await - .try_publish(cancel_key_data, session_id) + .try_publish(cancel_key_data, session_id, peer_addr) .await } } @@ -97,11 +105,13 @@ impl RedisPublisherClient { &mut self, cancel_key_data: CancelKeyData, session_id: Uuid, + peer_addr: IpAddr, ) -> anyhow::Result<()> { let payload = serde_json::to_string(&Notification::Cancel(CancelSession { region_id: Some(self.region_id.clone()), cancel_key_data, session_id, + peer_addr: Some(peer_addr), }))?; let _: () = self.client.publish(PROXY_CHANNEL_NAME, payload).await?; Ok(()) @@ -120,13 +130,14 @@ impl RedisPublisherClient { &mut self, cancel_key_data: CancelKeyData, session_id: Uuid, + peer_addr: IpAddr, ) -> anyhow::Result<()> { // TODO: review redundant error duplication logs. if !self.limiter.check() { tracing::info!("Rate limit exceeded. Skipping cancellation message"); return Err(anyhow::anyhow!("Rate limit exceeded")); } - match self.publish(cancel_key_data, session_id).await { + match self.publish(cancel_key_data, session_id, peer_addr).await { Ok(()) => return Ok(()), Err(e) => { tracing::error!("failed to publish a message: {e}"); @@ -134,7 +145,7 @@ impl RedisPublisherClient { } tracing::info!("Publisher is disconnected. Reconnectiong..."); self.try_connect().await?; - self.publish(cancel_key_data, session_id).await + self.publish(cancel_key_data, session_id, peer_addr).await } } @@ -143,9 +154,13 @@ impl CancellationPublisherMut for RedisPublisherClient { &mut self, cancel_key_data: CancelKeyData, session_id: Uuid, + peer_addr: IpAddr, ) -> anyhow::Result<()> { tracing::info!("publishing cancellation key to Redis"); - match self.try_publish_internal(cancel_key_data, session_id).await { + match self + .try_publish_internal(cancel_key_data, session_id, peer_addr) + .await + { Ok(()) => { tracing::debug!("cancellation key successfuly published to Redis"); Ok(()) diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index 62e7b1b565..65008ae943 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -60,6 +60,7 @@ pub(crate) struct CancelSession { pub(crate) region_id: Option, pub(crate) cancel_key_data: CancelKeyData, pub(crate) session_id: Uuid, + pub(crate) peer_addr: Option, } fn deserialize_json_string<'de, D, T>(deserializer: D) -> Result @@ -137,10 +138,20 @@ impl MessageHandler { return Ok(()); } } + + // TODO: Remove unspecified peer_addr after the complete migration to the new format + let peer_addr = cancel_session + .peer_addr + .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)); // This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message. match self .cancellation_handler - .cancel_session(cancel_session.cancel_key_data, uuid::Uuid::nil()) + .cancel_session( + cancel_session.cancel_key_data, + uuid::Uuid::nil(), + &peer_addr, + cancel_session.peer_addr.is_some(), + ) .await { Ok(()) => {} @@ -335,6 +346,7 @@ mod tests { cancel_key_data, region_id: None, session_id: uuid, + peer_addr: None, }); let text = serde_json::to_string(&msg)?; let result: Notification = serde_json::from_str(&text)?; @@ -344,6 +356,7 @@ mod tests { cancel_key_data, region_id: Some("region".to_string()), session_id: uuid, + peer_addr: None, }); let text = serde_json::to_string(&msg)?; let result: Notification = serde_json::from_str(&text)?;