mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
proxy: Implement cancellation rate limiting (#9739)
Implement cancellation rate limiting and ip allowlist checks. Add ip_allowlist to the cancel closure Fixes [#16456](https://github.com/neondatabase/cloud/issues/16456)
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -2838,9 +2838,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.9.0"
|
||||
version = "2.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
|
||||
checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708"
|
||||
|
||||
[[package]]
|
||||
name = "is-terminal"
|
||||
|
||||
@@ -106,7 +106,7 @@ hyper-util = "0.1"
|
||||
tokio-tungstenite = "0.21.0"
|
||||
indexmap = "2"
|
||||
indoc = "2"
|
||||
ipnet = "2.9.0"
|
||||
ipnet = "2.10.0"
|
||||
itertools = "0.10"
|
||||
itoa = "1.0.11"
|
||||
jsonwebtoken = "9"
|
||||
|
||||
@@ -6,6 +6,7 @@ use tokio_postgres::config::SslMode;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
use super::ComputeCredentialKeys;
|
||||
use crate::auth::IpPattern;
|
||||
use crate::cache::Cached;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
@@ -74,10 +75,10 @@ impl ConsoleRedirectBackend {
|
||||
ctx: &RequestContext,
|
||||
auth_config: &'static AuthenticationConfig,
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
) -> auth::Result<ConsoleRedirectNodeInfo> {
|
||||
) -> auth::Result<(ConsoleRedirectNodeInfo, Option<Vec<IpPattern>>)> {
|
||||
authenticate(ctx, auth_config, &self.console_uri, client)
|
||||
.await
|
||||
.map(ConsoleRedirectNodeInfo)
|
||||
.map(|(node_info, ip_allowlist)| (ConsoleRedirectNodeInfo(node_info), ip_allowlist))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,7 +103,7 @@ async fn authenticate(
|
||||
auth_config: &'static AuthenticationConfig,
|
||||
link_uri: &reqwest::Url,
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
) -> auth::Result<NodeInfo> {
|
||||
) -> auth::Result<(NodeInfo, Option<Vec<IpPattern>>)> {
|
||||
ctx.set_auth_method(crate::context::AuthMethod::ConsoleRedirect);
|
||||
|
||||
// registering waiter can fail if we get unlucky with rng.
|
||||
@@ -176,9 +177,12 @@ async fn authenticate(
|
||||
config.password(password.as_ref());
|
||||
}
|
||||
|
||||
Ok(NodeInfo {
|
||||
config,
|
||||
aux: db_info.aux,
|
||||
allow_self_signed_compute: false, // caller may override
|
||||
})
|
||||
Ok((
|
||||
NodeInfo {
|
||||
config,
|
||||
aux: db_info.aux,
|
||||
allow_self_signed_compute: false, // caller may override
|
||||
},
|
||||
db_info.allowed_ips,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ pub mod local;
|
||||
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub use console_redirect::ConsoleRedirectBackend;
|
||||
pub(crate) use console_redirect::ConsoleRedirectError;
|
||||
@@ -30,7 +29,7 @@ use crate::intern::EndpointIdInt;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::proxy::connect_compute::ComputeConnectBackend;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter, RateBucketInfo};
|
||||
use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter};
|
||||
use crate::stream::Stream;
|
||||
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
|
||||
use crate::{scram, stream};
|
||||
@@ -192,21 +191,6 @@ impl MaskedIp {
|
||||
// This can't be just per IP because that would limit some PaaS that share IP addresses
|
||||
pub type AuthRateLimiter = BucketRateLimiter<(EndpointIdInt, MaskedIp)>;
|
||||
|
||||
impl RateBucketInfo {
|
||||
/// All of these are per endpoint-maskedip pair.
|
||||
/// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus).
|
||||
///
|
||||
/// First bucket: 1000mcpus total per endpoint-ip pair
|
||||
/// * 4096000 requests per second with 1 hash rounds.
|
||||
/// * 1000 requests per second with 4096 hash rounds.
|
||||
/// * 6.8 requests per second with 600000 hash rounds.
|
||||
pub const DEFAULT_AUTH_SET: [Self; 3] = [
|
||||
Self::new(1000 * 4096, Duration::from_secs(1)),
|
||||
Self::new(600 * 4096, Duration::from_secs(60)),
|
||||
Self::new(300 * 4096, Duration::from_secs(600)),
|
||||
];
|
||||
}
|
||||
|
||||
impl AuthenticationConfig {
|
||||
pub(crate) fn check_rate_limit(
|
||||
&self,
|
||||
|
||||
@@ -428,8 +428,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
)?))),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let cancellation_handler = Arc::new(CancellationHandler::<
|
||||
Option<Arc<tokio::sync::Mutex<RedisPublisherClient>>>,
|
||||
Option<Arc<Mutex<RedisPublisherClient>>>,
|
||||
>::new(
|
||||
cancel_map.clone(),
|
||||
redis_publisher,
|
||||
|
||||
@@ -10,16 +10,23 @@ use tokio_postgres::{CancelToken, NoTls};
|
||||
use tracing::{debug, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
|
||||
use crate::rate_limiter::LeakyBucketRateLimiter;
|
||||
use crate::redis::cancellation_publisher::{
|
||||
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
|
||||
};
|
||||
use std::net::IpAddr;
|
||||
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
|
||||
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
|
||||
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
|
||||
pub(crate) type CancellationHandlerMainInternal = Option<Arc<Mutex<RedisPublisherClient>>>;
|
||||
|
||||
type IpSubnetKey = IpNet;
|
||||
|
||||
/// Enables serving `CancelRequest`s.
|
||||
///
|
||||
/// If `CancellationPublisher` is available, cancel request will be used to publish the cancellation key to other proxy instances.
|
||||
@@ -29,14 +36,23 @@ pub struct CancellationHandler<P> {
|
||||
/// This field used for the monitoring purposes.
|
||||
/// Represents the source of the cancellation request.
|
||||
from: CancellationSource,
|
||||
// rate limiter of cancellation requests
|
||||
limiter: Arc<std::sync::Mutex<LeakyBucketRateLimiter<IpSubnetKey>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum CancelError {
|
||||
#[error("{0}")]
|
||||
IO(#[from] std::io::Error),
|
||||
|
||||
#[error("{0}")]
|
||||
Postgres(#[from] tokio_postgres::Error),
|
||||
|
||||
#[error("rate limit exceeded")]
|
||||
RateLimit,
|
||||
|
||||
#[error("IP is not allowed")]
|
||||
IpNotAllowed,
|
||||
}
|
||||
|
||||
impl ReportableError for CancelError {
|
||||
@@ -47,6 +63,8 @@ impl ReportableError for CancelError {
|
||||
crate::error::ErrorKind::Postgres
|
||||
}
|
||||
CancelError::Postgres(_) => crate::error::ErrorKind::Compute,
|
||||
CancelError::RateLimit => crate::error::ErrorKind::RateLimit,
|
||||
CancelError::IpNotAllowed => crate::error::ErrorKind::User,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -79,13 +97,36 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
cancellation_handler: self,
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to cancel a running query for the corresponding connection.
|
||||
/// If the cancellation key is not found, it will be published to Redis.
|
||||
/// check_allowed - if true, check if the IP is allowed to cancel the query
|
||||
pub(crate) async fn cancel_session(
|
||||
&self,
|
||||
key: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: &IpAddr,
|
||||
check_allowed: bool,
|
||||
) -> Result<(), CancelError> {
|
||||
// TODO: check for unspecified address is only for backward compatibility, should be removed
|
||||
if !peer_addr.is_unspecified() {
|
||||
let subnet_key = match *peer_addr {
|
||||
IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here
|
||||
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
|
||||
};
|
||||
if !self.limiter.lock().unwrap().check(subnet_key, 1) {
|
||||
tracing::debug!("Rate limit exceeded. Skipping cancellation message");
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.cancellation_requests_total
|
||||
.inc(CancellationRequest {
|
||||
source: self.from,
|
||||
kind: crate::metrics::CancellationOutcome::RateLimitExceeded,
|
||||
});
|
||||
return Err(CancelError::RateLimit);
|
||||
}
|
||||
}
|
||||
|
||||
// NB: we should immediately release the lock after cloning the token.
|
||||
let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else {
|
||||
tracing::warn!("query cancellation key not found: {key}");
|
||||
@@ -96,7 +137,13 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
source: self.from,
|
||||
kind: crate::metrics::CancellationOutcome::NotFound,
|
||||
});
|
||||
match self.client.try_publish(key, session_id).await {
|
||||
|
||||
if session_id == Uuid::nil() {
|
||||
// was already published, do not publish it again
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match self.client.try_publish(key, session_id, *peer_addr).await {
|
||||
Ok(()) => {} // do nothing
|
||||
Err(e) => {
|
||||
return Err(CancelError::IO(std::io::Error::new(
|
||||
@@ -107,6 +154,13 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if check_allowed
|
||||
&& !check_peer_addr_is_in_list(peer_addr, cancel_closure.ip_allowlist.as_slice())
|
||||
{
|
||||
return Err(CancelError::IpNotAllowed);
|
||||
}
|
||||
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.cancellation_requests_total
|
||||
@@ -135,13 +189,29 @@ impl CancellationHandler<()> {
|
||||
map,
|
||||
client: (),
|
||||
from,
|
||||
limiter: Arc::new(std::sync::Mutex::new(
|
||||
LeakyBucketRateLimiter::<IpSubnetKey>::new_with_shards(
|
||||
LeakyBucketRateLimiter::<IpSubnetKey>::DEFAULT,
|
||||
64,
|
||||
),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
|
||||
pub fn new(map: CancelMap, client: Option<Arc<Mutex<P>>>, from: CancellationSource) -> Self {
|
||||
Self { map, client, from }
|
||||
Self {
|
||||
map,
|
||||
client,
|
||||
from,
|
||||
limiter: Arc::new(std::sync::Mutex::new(
|
||||
LeakyBucketRateLimiter::<IpSubnetKey>::new_with_shards(
|
||||
LeakyBucketRateLimiter::<IpSubnetKey>::DEFAULT,
|
||||
64,
|
||||
),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,13 +222,19 @@ impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
|
||||
pub struct CancelClosure {
|
||||
socket_addr: SocketAddr,
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
}
|
||||
|
||||
impl CancelClosure {
|
||||
pub(crate) fn new(socket_addr: SocketAddr, cancel_token: CancelToken) -> Self {
|
||||
pub(crate) fn new(
|
||||
socket_addr: SocketAddr,
|
||||
cancel_token: CancelToken,
|
||||
ip_allowlist: Vec<IpPattern>,
|
||||
) -> Self {
|
||||
Self {
|
||||
socket_addr,
|
||||
cancel_token,
|
||||
ip_allowlist,
|
||||
}
|
||||
}
|
||||
/// Cancels the query running on user's compute node.
|
||||
@@ -168,6 +244,9 @@ impl CancelClosure {
|
||||
debug!("query was cancelled");
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) fn set_ip_allowlist(&mut self, ip_allowlist: Vec<IpPattern>) {
|
||||
self.ip_allowlist = ip_allowlist;
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper for registering query cancellation tokens.
|
||||
@@ -229,6 +308,8 @@ mod tests {
|
||||
cancel_key: 0,
|
||||
},
|
||||
Uuid::new_v4(),
|
||||
&("127.0.0.1".parse().unwrap()),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -342,7 +342,7 @@ impl ConnCfg {
|
||||
|
||||
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
|
||||
// Yet another reason to rework the connection establishing code.
|
||||
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
|
||||
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token(), vec![]);
|
||||
|
||||
let connection = PostgresConnection {
|
||||
stream,
|
||||
|
||||
@@ -156,16 +156,21 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let request_gauge = metrics.connection_requests.guard(proto);
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
|
||||
let record_handshake_error = !ctx.has_private_peer_addr();
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
let do_handshake = handshake(ctx, stream, tls, record_handshake_error);
|
||||
|
||||
let (mut stream, params) =
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
return Ok(cancellation_handler
|
||||
.cancel_session(cancel_key_data, ctx.session_id())
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
ctx.session_id(),
|
||||
&ctx.peer_addr(),
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await
|
||||
.map(|()| None)?)
|
||||
}
|
||||
@@ -174,7 +179,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
ctx.set_db_options(params.clone());
|
||||
|
||||
let user_info = match backend
|
||||
let (user_info, ip_allowlist) = match backend
|
||||
.authenticate(ctx, &config.authentication_config, &mut stream)
|
||||
.await
|
||||
{
|
||||
@@ -198,6 +203,8 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
node.cancel_closure
|
||||
.set_ip_allowlist(ip_allowlist.unwrap_or_default());
|
||||
let session = cancellation_handler.get_session();
|
||||
prepare_client_connection(&node, &session, &mut stream).await?;
|
||||
|
||||
|
||||
@@ -351,6 +351,7 @@ pub enum CancellationSource {
|
||||
pub enum CancellationOutcome {
|
||||
NotFound,
|
||||
Found,
|
||||
RateLimitExceeded,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
|
||||
@@ -268,12 +268,18 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let record_handshake_error = !ctx.has_private_peer_addr();
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error);
|
||||
|
||||
let (mut stream, params) =
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
return Ok(cancellation_handler
|
||||
.cancel_session(cancel_key_data, ctx.session_id())
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
ctx.session_id(),
|
||||
&ctx.peer_addr(),
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await
|
||||
.map(|()| None)?)
|
||||
}
|
||||
|
||||
@@ -14,13 +14,13 @@ use tracing::info;
|
||||
|
||||
use crate::intern::EndpointIdInt;
|
||||
|
||||
pub(crate) struct GlobalRateLimiter {
|
||||
pub struct GlobalRateLimiter {
|
||||
data: Vec<RateBucket>,
|
||||
info: Vec<RateBucketInfo>,
|
||||
}
|
||||
|
||||
impl GlobalRateLimiter {
|
||||
pub(crate) fn new(info: Vec<RateBucketInfo>) -> Self {
|
||||
pub fn new(info: Vec<RateBucketInfo>) -> Self {
|
||||
Self {
|
||||
data: vec![
|
||||
RateBucket {
|
||||
@@ -34,7 +34,7 @@ impl GlobalRateLimiter {
|
||||
}
|
||||
|
||||
/// Check that number of connections is below `max_rps` rps.
|
||||
pub(crate) fn check(&mut self) -> bool {
|
||||
pub fn check(&mut self) -> bool {
|
||||
let now = Instant::now();
|
||||
|
||||
let should_allow_request = self
|
||||
@@ -137,6 +137,19 @@ impl RateBucketInfo {
|
||||
Self::new(200, Duration::from_secs(600)),
|
||||
];
|
||||
|
||||
/// All of these are per endpoint-maskedip pair.
|
||||
/// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus).
|
||||
///
|
||||
/// First bucket: 1000mcpus total per endpoint-ip pair
|
||||
/// * 4096000 requests per second with 1 hash rounds.
|
||||
/// * 1000 requests per second with 4096 hash rounds.
|
||||
/// * 6.8 requests per second with 600000 hash rounds.
|
||||
pub const DEFAULT_AUTH_SET: [Self; 3] = [
|
||||
Self::new(1000 * 4096, Duration::from_secs(1)),
|
||||
Self::new(600 * 4096, Duration::from_secs(60)),
|
||||
Self::new(300 * 4096, Duration::from_secs(600)),
|
||||
];
|
||||
|
||||
pub fn rps(&self) -> f64 {
|
||||
(self.max_rpi as f64) / self.interval.as_secs_f64()
|
||||
}
|
||||
|
||||
@@ -8,5 +8,4 @@ pub(crate) use limit_algorithm::aimd::Aimd;
|
||||
pub(crate) use limit_algorithm::{
|
||||
DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token,
|
||||
};
|
||||
pub(crate) use limiter::GlobalRateLimiter;
|
||||
pub use limiter::{BucketRateLimiter, RateBucketInfo, WakeComputeRateLimiter};
|
||||
pub use limiter::{BucketRateLimiter, GlobalRateLimiter, RateBucketInfo, WakeComputeRateLimiter};
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use core::net::IpAddr;
|
||||
use pq_proto::CancelKeyData;
|
||||
use redis::AsyncCommands;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -15,6 +16,7 @@ pub trait CancellationPublisherMut: Send + Sync + 'static {
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
@@ -24,6 +26,7 @@ pub trait CancellationPublisher: Send + Sync + 'static {
|
||||
&self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
@@ -32,6 +35,7 @@ impl CancellationPublisher for () {
|
||||
&self,
|
||||
_cancel_key_data: CancelKeyData,
|
||||
_session_id: Uuid,
|
||||
_peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
@@ -42,8 +46,10 @@ impl<P: CancellationPublisher> CancellationPublisherMut for P {
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()> {
|
||||
<P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id).await
|
||||
<P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id, peer_addr)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,9 +58,10 @@ impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
|
||||
&self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()> {
|
||||
if let Some(p) = self {
|
||||
p.try_publish(cancel_key_data, session_id).await
|
||||
p.try_publish(cancel_key_data, session_id, peer_addr).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
@@ -66,10 +73,11 @@ impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
|
||||
&self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()> {
|
||||
self.lock()
|
||||
.await
|
||||
.try_publish(cancel_key_data, session_id)
|
||||
.try_publish(cancel_key_data, session_id, peer_addr)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -97,11 +105,13 @@ impl RedisPublisherClient {
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()> {
|
||||
let payload = serde_json::to_string(&Notification::Cancel(CancelSession {
|
||||
region_id: Some(self.region_id.clone()),
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_addr: Some(peer_addr),
|
||||
}))?;
|
||||
let _: () = self.client.publish(PROXY_CHANNEL_NAME, payload).await?;
|
||||
Ok(())
|
||||
@@ -120,13 +130,14 @@ impl RedisPublisherClient {
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: review redundant error duplication logs.
|
||||
if !self.limiter.check() {
|
||||
tracing::info!("Rate limit exceeded. Skipping cancellation message");
|
||||
return Err(anyhow::anyhow!("Rate limit exceeded"));
|
||||
}
|
||||
match self.publish(cancel_key_data, session_id).await {
|
||||
match self.publish(cancel_key_data, session_id, peer_addr).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(e) => {
|
||||
tracing::error!("failed to publish a message: {e}");
|
||||
@@ -134,7 +145,7 @@ impl RedisPublisherClient {
|
||||
}
|
||||
tracing::info!("Publisher is disconnected. Reconnectiong...");
|
||||
self.try_connect().await?;
|
||||
self.publish(cancel_key_data, session_id).await
|
||||
self.publish(cancel_key_data, session_id, peer_addr).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,9 +154,13 @@ impl CancellationPublisherMut for RedisPublisherClient {
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: IpAddr,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::info!("publishing cancellation key to Redis");
|
||||
match self.try_publish_internal(cancel_key_data, session_id).await {
|
||||
match self
|
||||
.try_publish_internal(cancel_key_data, session_id, peer_addr)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
tracing::debug!("cancellation key successfuly published to Redis");
|
||||
Ok(())
|
||||
|
||||
@@ -60,6 +60,7 @@ pub(crate) struct CancelSession {
|
||||
pub(crate) region_id: Option<String>,
|
||||
pub(crate) cancel_key_data: CancelKeyData,
|
||||
pub(crate) session_id: Uuid,
|
||||
pub(crate) peer_addr: Option<std::net::IpAddr>,
|
||||
}
|
||||
|
||||
fn deserialize_json_string<'de, D, T>(deserializer: D) -> Result<T, D::Error>
|
||||
@@ -137,10 +138,20 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Remove unspecified peer_addr after the complete migration to the new format
|
||||
let peer_addr = cancel_session
|
||||
.peer_addr
|
||||
.unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
|
||||
// This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message.
|
||||
match self
|
||||
.cancellation_handler
|
||||
.cancel_session(cancel_session.cancel_key_data, uuid::Uuid::nil())
|
||||
.cancel_session(
|
||||
cancel_session.cancel_key_data,
|
||||
uuid::Uuid::nil(),
|
||||
&peer_addr,
|
||||
cancel_session.peer_addr.is_some(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
@@ -335,6 +346,7 @@ mod tests {
|
||||
cancel_key_data,
|
||||
region_id: None,
|
||||
session_id: uuid,
|
||||
peer_addr: None,
|
||||
});
|
||||
let text = serde_json::to_string(&msg)?;
|
||||
let result: Notification = serde_json::from_str(&text)?;
|
||||
@@ -344,6 +356,7 @@ mod tests {
|
||||
cancel_key_data,
|
||||
region_id: Some("region".to_string()),
|
||||
session_id: uuid,
|
||||
peer_addr: None,
|
||||
});
|
||||
let text = serde_json::to_string(&msg)?;
|
||||
let result: Notification = serde_json::from_str(&text)?;
|
||||
|
||||
Reference in New Issue
Block a user