Compare commits

..

2 Commits

Author SHA1 Message Date
Anna Khanova
de4449281d Fix 2024-04-10 11:51:10 +02:00
Anna Khanova
2876eaba61 Proper fix 2024-04-10 11:48:44 +02:00
27 changed files with 422 additions and 344 deletions

1
Cargo.lock generated
View File

@@ -4320,7 +4320,6 @@ dependencies = [
"hyper-util",
"ipnet",
"itertools",
"jsonwebtoken",
"lasso",
"md5",
"metrics",

View File

@@ -6,8 +6,8 @@ use std::path::Path;
use anyhow::Result;
use crate::pg_helpers::escape_conf_value;
use crate::pg_helpers::{GenericOptionExt, PgOptionsSerialize};
use compute_api::spec::{ComputeMode, ComputeSpec, GenericOption};
use crate::pg_helpers::PgOptionsSerialize;
use compute_api::spec::{ComputeMode, ComputeSpec};
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
@@ -92,27 +92,6 @@ pub fn write_postgres_conf(
}
}
if cfg!(target_os = "linux") {
// Check /proc/sys/vm/overcommit_memory -- if it equals 2 (i.e. linux memory overcommit is
// disabled), then the control plane has enabled swap and we should set
// dynamic_shared_memory_type = 'mmap'.
//
// This is (maybe?) temporary - for more, see https://github.com/neondatabase/cloud/issues/12047.
let overcommit_memory_contents = std::fs::read_to_string("/proc/sys/vm/overcommit_memory")
// ignore any errors - they may be expected to occur under certain situations (e.g. when
// not running in Linux).
.unwrap_or_else(|_| String::new());
if overcommit_memory_contents.trim() == "2" {
let opt = GenericOption {
name: "dynamic_shared_memory_type".to_owned(),
value: Some("mmap".to_owned()),
vartype: "enum".to_owned(),
};
write!(file, "{}", opt.to_pg_setting())?;
}
}
// If there are any extra options in the 'settings' field, append those
if spec.cluster.settings.is_some() {
writeln!(file, "# Managed by compute_ctl: begin")?;

View File

@@ -44,7 +44,7 @@ pub fn escape_conf_value(s: &str) -> String {
format!("'{}'", res)
}
pub trait GenericOptionExt {
trait GenericOptionExt {
fn to_pg_option(&self) -> String;
fn to_pg_setting(&self) -> String;
}

View File

@@ -97,7 +97,6 @@ native-tls.workspace = true
postgres-native-tls.workspace = true
postgres-protocol.workspace = true
redis.workspace = true
jsonwebtoken.workspace = true
workspace_hack.workspace = true

View File

@@ -13,8 +13,6 @@ mod password_hack;
pub use password_hack::parse_endpoint_param;
use password_hack::PasswordHackPayload;
pub mod caps;
mod flow;
pub use flow::*;
use tokio::time::error::Elapsed;
@@ -73,9 +71,6 @@ pub enum AuthErrorImpl {
#[error("Too many connections to this endpoint. Please try again later.")]
TooManyConnections,
#[error("neon_caps token is invalid")]
CapsInvalid,
#[error("Authentication timed out")]
UserTimeout(Elapsed),
}
@@ -101,10 +96,6 @@ impl AuthError {
AuthErrorImpl::TooManyConnections.into()
}
pub fn caps_invalid() -> Self {
AuthErrorImpl::CapsInvalid.into()
}
pub fn is_auth_failed(&self) -> bool {
matches!(self.0.as_ref(), AuthErrorImpl::AuthFailed(_))
}
@@ -135,7 +126,6 @@ impl UserFacingError for AuthError {
IpAddressNotAllowed(_) => self.to_string(),
TooManyConnections => self.to_string(),
UserTimeout(_) => self.to_string(),
CapsInvalid => self.to_string(),
}
}
}
@@ -155,7 +145,6 @@ impl ReportableError for AuthError {
IpAddressNotAllowed(_) => crate::error::ErrorKind::User,
TooManyConnections => crate::error::ErrorKind::RateLimit,
UserTimeout(_) => crate::error::ErrorKind::User,
CapsInvalid => crate::error::ErrorKind::User,
}
}
}

View File

@@ -27,8 +27,7 @@ use crate::{
},
stream, url,
};
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
use std::net::IpAddr;
use crate::{scram, EndpointCacheKey, EndpointId, Normalize, RoleName};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
@@ -187,7 +186,7 @@ impl AuthenticationConfig {
is_cleartext: bool,
) -> auth::Result<AuthSecret> {
// we have validated the endpoint exists, so let's intern it.
let endpoint_int = EndpointIdInt::from(endpoint);
let endpoint_int = EndpointIdInt::from(endpoint.normalize());
// only count the full hash count if password hack or websocket flow.
// in other words, if proxy needs to run the hashing
@@ -252,13 +251,11 @@ async fn auth_quirks(
Ok(info) => (info, None),
};
let bypass_ipcheck = apply_caps(&config, &info, &ctx.peer_addr)?;
info!("fetching user's authentication info");
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
// check allowed list
if !bypass_ipcheck && !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr));
}
let cached_secret = match maybe_secret {
@@ -540,7 +537,6 @@ mod tests {
scram_protocol_timeout: std::time::Duration::from_secs(5),
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
caps: None,
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {
@@ -699,43 +695,3 @@ mod tests {
handle.await.unwrap();
}
}
// It checks that provided JWT capabilities are valid for the connection
//
// if it returns Ok(true), futher peer IP checks has to be disabled
//
// If proxy isn't configured for JWT capabilities or neon_caps option
// isn't set, it skips any checks
pub fn apply_caps(
config: &AuthenticationConfig,
info: &ComputeUserInfo,
peer_addr: &IpAddr,
) -> auth::Result<bool> {
match (&config.caps, info.options.caps()) {
(Some(caps_config), Some(caps)) => {
let token = match caps_config.decode(&caps) {
Err(_) => {
return Err(auth::AuthError::caps_invalid());
}
Ok(token) => token,
};
if token.claims.endpoint_id != *info.endpoint {
return Err(auth::AuthError::caps_invalid());
}
match token.claims.check_ip(peer_addr) {
None => return Ok(false),
Some(true) => {
return Ok(true);
}
Some(false) => {
return Err(auth::AuthError::ip_address_not_allowed(*peer_addr));
}
}
}
_ => {
return Ok(false);
}
}
}

View File

@@ -1,96 +0,0 @@
use std::{borrow::Cow, fmt::Display, fs, net::IpAddr};
use anyhow::Result;
use camino::Utf8Path;
use jsonwebtoken::{decode, Algorithm, DecodingKey, TokenData, Validation};
use serde::{Deserialize, Serialize};
use utils::http::error::ApiError;
use super::{check_peer_addr_is_in_list, IpPattern};
const TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
Connection,
}
#[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct Claims {
pub scope: Scope,
pub allowed_ips: Option<Vec<IpPattern>>,
pub endpoint_id: String,
}
impl Claims {
pub fn check_ip(&self, ip: &IpAddr) -> Option<bool> {
let allowed_ips = match &self.allowed_ips {
None => return None,
Some(allowed_ips) => allowed_ips,
};
if allowed_ips.is_empty() {
return Some(true);
}
return Some(check_peer_addr_is_in_list(ip, &allowed_ips));
}
}
pub struct CapsValidator {
decoding_key: DecodingKey,
validation: Validation,
}
impl CapsValidator {
pub fn new(decoding_key: DecodingKey) -> Self {
let mut validation = Validation::default();
validation.algorithms = vec![TOKEN_ALGORITHM];
Self {
decoding_key,
validation,
}
}
pub fn from_key_path(key_path: &Utf8Path) -> Result<Self> {
let metadata = key_path.metadata()?;
let decoding_key = if metadata.is_file() {
let public_key = fs::read(key_path)?;
DecodingKey::from_ed_pem(&public_key)?
} else {
anyhow::bail!("path isn't a file")
};
Ok(Self::new(decoding_key))
}
pub fn decode(&self, token: &str) -> std::result::Result<TokenData<Claims>, CapsError> {
return match decode(token, &self.decoding_key, &self.validation) {
Ok(res) => Ok(res),
Err(e) => Err(CapsError(Cow::Owned(e.to_string()))),
};
}
}
impl std::fmt::Debug for CapsValidator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CapsValidator")
.field("validation", &self.validation)
.finish()
}
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct CapsError(pub Cow<'static, str>);
impl Display for CapsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<CapsError> for ApiError {
fn from(_value: CapsError) -> Self {
ApiError::Forbidden("neon_caps validation error".to_string())
}
}

View File

@@ -5,11 +5,9 @@ use aws_config::meta::region::RegionProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use camino::Utf8Path;
use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::MaybeOwned;
use proxy::auth::caps::CapsValidator;
use proxy::cancellation::CancelMap;
use proxy::cancellation::CancellationHandler;
use proxy::config::remote_storage_from_toml;
@@ -191,13 +189,12 @@ struct ProxyCliArgs {
/// cache for `project_info` (use `size=0` to disable)
#[clap(long, default_value = config::ProjectInfoCacheOptions::CACHE_DEFAULT_OPTIONS)]
project_info_cache: String,
/// cache for all valid endpoints
#[clap(long, default_value = config::EndpointCacheConfig::CACHE_DEFAULT_OPTIONS)]
endpoint_cache_config: String,
#[clap(flatten)]
parquet_upload: ParquetUploadArgs,
#[clap(long)]
caps_key: Option<String>,
/// interval for backup metric collection
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
metric_backup_collection_interval: std::time::Duration,
@@ -415,6 +412,9 @@ async fn main() -> anyhow::Result<()> {
args.region.clone(),
));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
let cache = api.caches.endpoints_cache.clone();
let con = redis_notifications_client.clone();
maintenance_tasks.spawn(async move { cache.do_read(con).await });
}
}
}
@@ -494,14 +494,18 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
args.project_info_cache.parse()?;
let endpoint_cache_config: config::EndpointCacheConfig =
args.endpoint_cache_config.parse()?;
info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}");
info!(
"Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}"
);
info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}");
let caches = Box::leak(Box::new(console::caches::ApiCaches::new(
wake_compute_cache_config,
project_info_cache_config,
endpoint_cache_config,
)));
let config::WakeComputeLockOptions {
@@ -512,10 +516,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
} = args.wake_compute_lock.parse()?;
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
let locks = Box::leak(Box::new(
console::locks::ApiLocks::new("wake_compute_lock", permits, shards, timeout)
console::locks::ApiLocks::new("wake_compute_lock", permits, shards, timeout, epoch)
.unwrap(),
));
tokio::spawn(locks.garbage_collect_worker(epoch));
tokio::spawn(locks.garbage_collect_worker());
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client(rate_limiter_config));
@@ -547,19 +551,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
},
};
let caps = if let Some(key) = &args.caps_key {
let path = Utf8Path::new(key);
Some(CapsValidator::from_key_path(path)?);
} else {
None;
};
let authentication_config = AuthenticationConfig {
scram_protocol_timeout: args.scram_protocol_timeout,
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
caps,
};
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();

View File

@@ -1,4 +1,5 @@
pub mod common;
pub mod endpoints;
pub mod project_info;
mod timed_lru;

190
proxy/src/cache/endpoints.rs vendored Normal file
View File

@@ -0,0 +1,190 @@
use std::{
convert::Infallible,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use dashmap::DashSet;
use redis::{
streams::{StreamReadOptions, StreamReadReply},
AsyncCommands, FromRedisValue, Value,
};
use serde::Deserialize;
use tokio::sync::Mutex;
use crate::{
config::EndpointCacheConfig,
context::RequestMonitoring,
intern::{BranchIdInt, EndpointIdInt, ProjectIdInt},
metrics::REDIS_BROKEN_MESSAGES,
rate_limiter::GlobalRateLimiter,
redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider,
EndpointId,
};
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all(deserialize = "snake_case"))]
pub enum ControlPlaneEventKey {
EndpointCreated,
BranchCreated,
ProjectCreated,
}
pub struct EndpointsCache {
config: EndpointCacheConfig,
endpoints: DashSet<EndpointIdInt>,
branches: DashSet<BranchIdInt>,
projects: DashSet<ProjectIdInt>,
ready: AtomicBool,
limiter: Arc<Mutex<GlobalRateLimiter>>,
}
impl EndpointsCache {
pub fn new(config: EndpointCacheConfig) -> Self {
Self {
limiter: Arc::new(Mutex::new(GlobalRateLimiter::new(
config.limiter_info.clone(),
))),
config,
endpoints: DashSet::new(),
branches: DashSet::new(),
projects: DashSet::new(),
ready: AtomicBool::new(false),
}
}
pub async fn is_valid(&self, ctx: &mut RequestMonitoring, endpoint: &EndpointId) -> bool {
if !self.ready.load(Ordering::Acquire) {
return true;
}
// If cache is disabled, just collect the metrics and return.
if self.config.disable_cache {
ctx.set_rejected(self.should_reject(endpoint));
return true;
}
// If the limiter allows, we don't need to check the cache.
if self.limiter.lock().await.check() {
return true;
}
let rejected = self.should_reject(endpoint);
ctx.set_rejected(rejected);
!rejected
}
fn should_reject(&self, endpoint: &EndpointId) -> bool {
if endpoint.is_endpoint() {
!self.endpoints.contains(&EndpointIdInt::from(endpoint))
} else if endpoint.is_branch() {
!self
.branches
.contains(&BranchIdInt::from(&endpoint.as_branch()))
} else {
!self
.projects
.contains(&ProjectIdInt::from(&endpoint.as_project()))
}
}
fn insert_event(&self, key: ControlPlaneEventKey, value: String) {
// Do not do normalization here, we expect the events to be normalized.
match key {
ControlPlaneEventKey::EndpointCreated => {
self.endpoints.insert(EndpointIdInt::from(&value.into()));
}
ControlPlaneEventKey::BranchCreated => {
self.branches.insert(BranchIdInt::from(&value.into()));
}
ControlPlaneEventKey::ProjectCreated => {
self.projects.insert(ProjectIdInt::from(&value.into()));
}
}
}
pub async fn do_read(
&self,
mut con: ConnectionWithCredentialsProvider,
) -> anyhow::Result<Infallible> {
let mut last_id = "0-0".to_string();
loop {
self.ready.store(false, Ordering::Release);
if let Err(e) = con.connect().await {
tracing::error!("error connecting to redis: {:?}", e);
continue;
}
if let Err(e) = self.read_from_stream(&mut con, &mut last_id).await {
tracing::error!("error reading from redis: {:?}", e);
}
}
}
async fn read_from_stream(
&self,
con: &mut ConnectionWithCredentialsProvider,
last_id: &mut String,
) -> anyhow::Result<()> {
tracing::info!("reading endpoints/branches/projects from redis");
self.batch_read(
con,
StreamReadOptions::default().count(self.config.initial_batch_size),
last_id,
true,
)
.await?;
tracing::info!("ready to filter user requests");
self.ready.store(true, Ordering::Release);
self.batch_read(
con,
StreamReadOptions::default()
.count(self.config.initial_batch_size)
.block(self.config.xread_timeout.as_millis() as usize),
last_id,
false,
)
.await
}
fn parse_key_value(key: &str, value: &Value) -> anyhow::Result<(ControlPlaneEventKey, String)> {
Ok((serde_json::from_str(key)?, String::from_redis_value(value)?))
}
async fn batch_read(
&self,
conn: &mut ConnectionWithCredentialsProvider,
opts: StreamReadOptions,
last_id: &mut String,
return_when_finish: bool,
) -> anyhow::Result<()> {
let mut total: usize = 0;
loop {
let mut res: StreamReadReply = conn
.xread_options(&[&self.config.stream_name], &[last_id.as_str()], &opts)
.await?;
if res.keys.len() != 1 {
anyhow::bail!("Cannot read from redis stream {}", self.config.stream_name);
}
let res = res.keys.pop().expect("Checked length above");
if return_when_finish && res.ids.len() <= self.config.default_batch_size {
break;
}
for x in res.ids {
total += 1;
for (k, v) in x.map {
let (key, value) = match Self::parse_key_value(&k, &v) {
Ok(x) => x,
Err(e) => {
REDIS_BROKEN_MESSAGES
.with_label_values(&[&self.config.stream_name])
.inc();
tracing::error!("error parsing key-value {k}-{v:?}: {e:?}");
continue;
}
};
self.insert_event(key, value);
}
if total.is_power_of_two() {
tracing::debug!("endpoints read {}", total);
}
*last_id = x.id;
}
}
tracing::info!("read {} endpoints/branches/projects from redis", total);
Ok(())
}
}

View File

@@ -1,5 +1,5 @@
use crate::{
auth::{self, caps::CapsValidator},
auth,
rate_limiter::{AuthRateLimiter, RateBucketInfo},
serverless::GlobalConnPoolOptions,
};
@@ -58,7 +58,6 @@ pub struct AuthenticationConfig {
pub scram_protocol_timeout: tokio::time::Duration,
pub rate_limiter_enabled: bool,
pub rate_limiter: AuthRateLimiter,
pub caps: Option<CapsValidator>,
}
impl TlsConfig {
@@ -314,6 +313,75 @@ impl CertResolver {
}
}
#[derive(Debug)]
pub struct EndpointCacheConfig {
/// Batch size to receive all endpoints on the startup.
pub initial_batch_size: usize,
/// Batch size to receive endpoints.
pub default_batch_size: usize,
/// Timeouts for the stream read operation.
pub xread_timeout: Duration,
/// Stream name to read from.
pub stream_name: String,
/// Limiter info (to distinguish when to enable cache).
pub limiter_info: Vec<RateBucketInfo>,
/// Disable cache.
/// If true, cache is ignored, but reports all statistics.
pub disable_cache: bool,
}
impl EndpointCacheConfig {
/// Default options for [`crate::console::provider::NodeInfoCache`].
/// Notice that by default the limiter is empty, which means that cache is disabled.
pub const CACHE_DEFAULT_OPTIONS: &'static str =
"initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s";
/// Parse cache options passed via cmdline.
/// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
fn parse(options: &str) -> anyhow::Result<Self> {
let mut initial_batch_size = None;
let mut default_batch_size = None;
let mut xread_timeout = None;
let mut stream_name = None;
let mut limiter_info = vec![];
let mut disable_cache = false;
for option in options.split(',') {
let (key, value) = option
.split_once('=')
.with_context(|| format!("bad key-value pair: {option}"))?;
match key {
"initial_batch_size" => initial_batch_size = Some(value.parse()?),
"default_batch_size" => default_batch_size = Some(value.parse()?),
"xread_timeout" => xread_timeout = Some(humantime::parse_duration(value)?),
"stream_name" => stream_name = Some(value.to_string()),
"limiter_info" => limiter_info.push(RateBucketInfo::from_str(value)?),
"disable_cache" => disable_cache = value.parse()?,
unknown => bail!("unknown key: {unknown}"),
}
}
RateBucketInfo::validate(&mut limiter_info)?;
Ok(Self {
initial_batch_size: initial_batch_size.context("missing `initial_batch_size`")?,
default_batch_size: default_batch_size.context("missing `default_batch_size`")?,
xread_timeout: xread_timeout.context("missing `xread_timeout`")?,
stream_name: stream_name.context("missing `stream_name`")?,
disable_cache,
limiter_info,
})
}
}
impl FromStr for EndpointCacheConfig {
type Err = anyhow::Error;
fn from_str(options: &str) -> Result<Self, Self::Err> {
let error = || format!("failed to parse endpoint cache options '{options}'");
Self::parse(options).with_context(error)
}
}
#[derive(Debug)]
pub struct MetricBackupCollectionConfig {
pub interval: Duration,

View File

@@ -8,15 +8,15 @@ use crate::{
backend::{ComputeCredentialKeys, ComputeUserInfo},
IpPattern,
},
cache::{project_info::ProjectInfoCacheImpl, Cached, TimedLru},
cache::{endpoints::EndpointsCache, project_info::ProjectInfoCacheImpl, Cached, TimedLru},
compute,
config::{CacheOptions, ProjectInfoCacheOptions},
config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions},
context::RequestMonitoring,
intern::ProjectIdInt,
scram, EndpointCacheKey,
};
use dashmap::DashMap;
use std::{sync::Arc, time::Duration};
use std::{convert::Infallible, sync::Arc, time::Duration};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::info;
@@ -416,12 +416,15 @@ pub struct ApiCaches {
pub node_info: NodeInfoCache,
/// Cache which stores project_id -> endpoint_ids mapping.
pub project_info: Arc<ProjectInfoCacheImpl>,
/// List of all valid endpoints.
pub endpoints_cache: Arc<EndpointsCache>,
}
impl ApiCaches {
pub fn new(
wake_compute_cache_config: CacheOptions,
project_info_cache_config: ProjectInfoCacheOptions,
endpoint_cache_config: EndpointCacheConfig,
) -> Self {
Self {
node_info: NodeInfoCache::new(
@@ -431,6 +434,7 @@ impl ApiCaches {
true,
),
project_info: Arc::new(ProjectInfoCacheImpl::new(project_info_cache_config)),
endpoints_cache: Arc::new(EndpointsCache::new(endpoint_cache_config)),
}
}
}
@@ -441,6 +445,7 @@ pub struct ApiLocks {
node_locks: DashMap<EndpointCacheKey, Arc<Semaphore>>,
permits: usize,
timeout: Duration,
epoch: std::time::Duration,
registered: prometheus::IntCounter,
unregistered: prometheus::IntCounter,
reclamation_lag: prometheus::Histogram,
@@ -453,6 +458,7 @@ impl ApiLocks {
permits: usize,
shards: usize,
timeout: Duration,
epoch: std::time::Duration,
) -> prometheus::Result<Self> {
let registered = prometheus::IntCounter::with_opts(
prometheus::Opts::new(
@@ -497,6 +503,7 @@ impl ApiLocks {
node_locks: DashMap::with_shard_amount(shards),
permits,
timeout,
epoch,
lock_acquire_lag,
registered,
unregistered,
@@ -536,12 +543,12 @@ impl ApiLocks {
})
}
pub async fn garbage_collect_worker(&self, epoch: std::time::Duration) {
pub async fn garbage_collect_worker(&self) {
if self.permits == 0 {
return;
}
let mut interval = tokio::time::interval(epoch / (self.node_locks.shards().len()) as u32);
let mut interval =
tokio::time::interval(self.epoch / (self.node_locks.shards().len()) as u32);
loop {
for (i, shard) in self.node_locks.shards().iter().enumerate() {
interval.tick().await;

View File

@@ -8,6 +8,7 @@ use super::{
};
use crate::{
auth::backend::ComputeUserInfo, compute, console::messages::ColdStartInfo, http, scram,
Normalize,
};
use crate::{
cache::Cached,
@@ -23,7 +24,7 @@ use tracing::{error, info, info_span, warn, Instrument};
pub struct Api {
endpoint: http::Endpoint,
pub caches: &'static ApiCaches,
locks: &'static ApiLocks,
pub locks: &'static ApiLocks,
jwt: String,
}
@@ -55,6 +56,15 @@ impl Api {
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> {
if !self
.caches
.endpoints_cache
.is_valid(ctx, &user_info.endpoint.normalize())
.await
{
info!("endpoint is not valid, skipping the request");
return Ok(AuthInfo::default());
}
let request_id = ctx.session_id.to_string();
let application_name = ctx.console_application_name();
async {
@@ -81,7 +91,9 @@ impl Api {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
Err(e) => match e.http_status_code() {
Some(http::StatusCode::NOT_FOUND) => return Ok(AuthInfo::default()),
Some(http::StatusCode::NOT_FOUND) => {
return Ok(AuthInfo::default());
}
_otherwise => return Err(e.into()),
},
};
@@ -174,23 +186,27 @@ impl super::Api for Api {
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
let ep = &user_info.endpoint;
let normalized_ep = &user_info.endpoint.normalize();
let user = &user_info.user;
if let Some(role_secret) = self.caches.project_info.get_role_secret(ep, user) {
if let Some(role_secret) = self
.caches
.project_info
.get_role_secret(normalized_ep, user)
{
return Ok(role_secret);
}
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
if let Some(project_id) = auth_info.project_id {
let ep_int = ep.into();
let normalized_ep_int = normalized_ep.into();
self.caches.project_info.insert_role_secret(
project_id,
ep_int,
normalized_ep_int,
user.into(),
auth_info.secret.clone(),
);
self.caches.project_info.insert_allowed_ips(
project_id,
ep_int,
normalized_ep_int,
Arc::new(auth_info.allowed_ips),
);
ctx.set_project_id(project_id);
@@ -204,8 +220,8 @@ impl super::Api for Api {
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
let ep = &user_info.endpoint;
if let Some(allowed_ips) = self.caches.project_info.get_allowed_ips(ep) {
let normalized_ep = &user_info.endpoint.normalize();
if let Some(allowed_ips) = self.caches.project_info.get_allowed_ips(normalized_ep) {
ALLOWED_IPS_BY_CACHE_OUTCOME
.with_label_values(&["hit"])
.inc();
@@ -218,16 +234,18 @@ impl super::Api for Api {
let allowed_ips = Arc::new(auth_info.allowed_ips);
let user = &user_info.user;
if let Some(project_id) = auth_info.project_id {
let ep_int = ep.into();
let normalized_ep_int = normalized_ep.into();
self.caches.project_info.insert_role_secret(
project_id,
ep_int,
normalized_ep_int,
user.into(),
auth_info.secret.clone(),
);
self.caches
.project_info
.insert_allowed_ips(project_id, ep_int, allowed_ips.clone());
self.caches.project_info.insert_allowed_ips(
project_id,
normalized_ep_int,
allowed_ips.clone(),
);
ctx.set_project_id(project_id);
}
Ok((

View File

@@ -12,7 +12,9 @@ use crate::{
console::messages::{ColdStartInfo, MetricsAuxInfo},
error::ErrorKind,
intern::{BranchIdInt, ProjectIdInt},
metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
metrics::{
bool_to_str, LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND, NUM_INVALID_ENDPOINTS,
},
DbName, EndpointId, RoleName,
};
@@ -50,6 +52,8 @@ pub struct RequestMonitoring {
// This sender is here to keep the request monitoring channel open while requests are taking place.
sender: Option<mpsc::UnboundedSender<RequestData>>,
pub latency_timer: LatencyTimer,
// Whether proxy decided that it's not a valid endpoint end rejected it before going to cplane.
rejected: bool,
}
#[derive(Clone, Debug)]
@@ -93,6 +97,7 @@ impl RequestMonitoring {
error_kind: None,
auth_method: None,
success: false,
rejected: false,
cold_start_info: ColdStartInfo::Unknown,
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
@@ -113,6 +118,10 @@ impl RequestMonitoring {
)
}
pub fn set_rejected(&mut self, rejected: bool) {
self.rejected = rejected;
}
pub fn set_cold_start_info(&mut self, info: ColdStartInfo) {
self.cold_start_info = info;
self.latency_timer.cold_start_info(info);
@@ -178,6 +187,10 @@ impl RequestMonitoring {
impl Drop for RequestMonitoring {
fn drop(&mut self) {
let outcome = if self.success { "success" } else { "failure" };
NUM_INVALID_ENDPOINTS
.with_label_values(&[self.protocol, bool_to_str(self.rejected), outcome])
.inc();
if let Some(tx) = self.sender.take() {
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}

View File

@@ -160,6 +160,11 @@ impl From<&EndpointId> for EndpointIdInt {
EndpointIdTag::get_interner().get_or_intern(value)
}
}
impl From<EndpointId> for EndpointIdInt {
fn from(value: EndpointId) -> Self {
EndpointIdTag::get_interner().get_or_intern(&value)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct BranchIdTag;
@@ -175,6 +180,11 @@ impl From<&BranchId> for BranchIdInt {
BranchIdTag::get_interner().get_or_intern(value)
}
}
impl From<BranchId> for BranchIdInt {
fn from(value: BranchId) -> Self {
BranchIdTag::get_interner().get_or_intern(&value)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct ProjectIdTag;
@@ -190,6 +200,11 @@ impl From<&ProjectId> for ProjectIdInt {
ProjectIdTag::get_interner().get_or_intern(value)
}
}
impl From<ProjectId> for ProjectIdInt {
fn from(value: ProjectId) -> Self {
ProjectIdTag::get_interner().get_or_intern(&value)
}
}
#[cfg(test)]
mod tests {

View File

@@ -127,6 +127,24 @@ macro_rules! smol_str_wrapper {
};
}
const POOLER_SUFFIX: &str = "-pooler";
pub trait Normalize {
fn normalize(&self) -> Self;
}
impl<S: Clone + AsRef<str> + From<String>> Normalize for S {
fn normalize(&self) -> Self {
if self.as_ref().ends_with(POOLER_SUFFIX) {
let mut s = self.as_ref().to_string();
s.truncate(s.len() - POOLER_SUFFIX.len());
s.into()
} else {
self.clone()
}
}
}
// 90% of role name strings are 20 characters or less.
smol_str_wrapper!(RoleName);
// 50% of endpoint strings are 23 characters or less.
@@ -140,3 +158,22 @@ smol_str_wrapper!(ProjectId);
smol_str_wrapper!(EndpointCacheKey);
smol_str_wrapper!(DbName);
// Endpoints are a bit tricky. Rare they might be branches or projects.
impl EndpointId {
pub fn is_endpoint(&self) -> bool {
self.0.starts_with("ep-")
}
pub fn is_branch(&self) -> bool {
self.0.starts_with("br-")
}
pub fn is_project(&self) -> bool {
!self.is_endpoint() && !self.is_branch()
}
pub fn as_branch(&self) -> BranchId {
BranchId(self.0.clone())
}
pub fn as_project(&self) -> ProjectId {
ProjectId(self.0.clone())
}
}

View File

@@ -169,6 +169,18 @@ pub static NUM_CANCELLATION_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
pub static NUM_INVALID_ENDPOINTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_invalid_endpoints_total",
"Number of invalid endpoints (per protocol, per rejected).",
// http/ws/tcp, true/false, success/failure
// TODO(anna): the last dimension is just a proxy to what we actually want to measure.
// We need to measure whether the endpoint was found by cplane or not.
&["protocol", "rejected", "outcome"],
)
.unwrap()
});
pub const NUM_CANCELLATION_REQUESTS_SOURCE_FROM_CLIENT: &str = "from_client";
pub const NUM_CANCELLATION_REQUESTS_SOURCE_FROM_REDIS: &str = "from_redis";

View File

@@ -20,7 +20,7 @@ use crate::{
proxy::handshake::{handshake, HandshakeData},
rate_limiter::EndpointRateLimiter,
stream::{PqStream, Stream},
EndpointCacheKey,
EndpointCacheKey, Normalize,
};
use futures::TryFutureExt;
use itertools::Itertools;
@@ -280,7 +280,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
// check rate limit
if let Some(ep) = user_info.get_endpoint() {
if !endpoint_rate_limiter.check(ep, 1) {
if !endpoint_rate_limiter.check(ep.normalize(), 1) {
return stream
.throw_error(auth::AuthError::too_many_connections())
.await?;
@@ -385,10 +385,6 @@ impl NeonOptions {
!self.0.is_empty()
}
pub fn caps(&self) -> Option<&str> {
self.0.iter().find(|(k, _)| k == "caps").map(|(_, v)| &**v)
}
fn parse_from_iter<'a>(options: impl Iterator<Item = &'a str>) -> Self {
let mut options = options
.filter_map(neon_option)
@@ -402,13 +398,7 @@ impl NeonOptions {
// prefix + format!(" {k}:{v}")
// kinda jank because SmolStr is immutable
std::iter::once(prefix)
// exclude caps from cache key
.chain(
self.0
.iter()
.filter(|(k, _)| k != "caps")
.flat_map(|(k, v)| [" ", &**k, ":", &**v]),
)
.chain(self.0.iter().flat_map(|(k, v)| [" ", &**k, ":", &**v]))
.collect::<SmolStr>()
.into()
}

View File

@@ -4,4 +4,4 @@ mod limiter;
pub use aimd::Aimd;
pub use limit_algorithm::{AimdConfig, Fixed, RateLimitAlgorithm, RateLimiterConfig};
pub use limiter::Limiter;
pub use limiter::{AuthRateLimiter, EndpointRateLimiter, RateBucketInfo, RedisRateLimiter};
pub use limiter::{AuthRateLimiter, EndpointRateLimiter, GlobalRateLimiter, RateBucketInfo};

View File

@@ -24,13 +24,13 @@ use super::{
RateLimiterConfig,
};
pub struct RedisRateLimiter {
pub struct GlobalRateLimiter {
data: Vec<RateBucket>,
info: &'static [RateBucketInfo],
info: Vec<RateBucketInfo>,
}
impl RedisRateLimiter {
pub fn new(info: &'static [RateBucketInfo]) -> Self {
impl GlobalRateLimiter {
pub fn new(info: Vec<RateBucketInfo>) -> Self {
Self {
data: vec![
RateBucket {
@@ -50,7 +50,7 @@ impl RedisRateLimiter {
let should_allow_request = self
.data
.iter_mut()
.zip(self.info)
.zip(&self.info)
.all(|(bucket, info)| bucket.should_allow_request(info, now, 1));
if should_allow_request {

View File

@@ -5,7 +5,7 @@ use redis::AsyncCommands;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter};
use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
use super::{
connection_with_credentials_provider::ConnectionWithCredentialsProvider,
@@ -80,7 +80,7 @@ impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
pub struct RedisPublisherClient {
client: ConnectionWithCredentialsProvider,
region_id: String,
limiter: RedisRateLimiter,
limiter: GlobalRateLimiter,
}
impl RedisPublisherClient {
@@ -92,7 +92,7 @@ impl RedisPublisherClient {
Ok(Self {
client,
region_id,
limiter: RedisRateLimiter::new(info),
limiter: GlobalRateLimiter::new(info.into()),
})
}

View File

@@ -4,10 +4,7 @@ use async_trait::async_trait;
use tracing::{field::display, info};
use crate::{
auth::{
backend::{apply_caps, ComputeCredentials},
check_peer_addr_is_in_list, AuthError,
},
auth::{backend::ComputeCredentials, check_peer_addr_is_in_list, AuthError},
compute,
config::ProxyConfig,
console::{
@@ -34,15 +31,8 @@ impl PoolingBackend {
) -> Result<ComputeCredentials, AuthError> {
let user_info = conn_info.user_info.clone();
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
let bypass_ipcheck = apply_caps(
&&self.config.authentication_config,
&user_info,
&ctx.peer_addr,
)?;
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if !bypass_ipcheck && !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr));
}
let cached_secret = match maybe_secret {

View File

@@ -1,84 +0,0 @@
import asyncio
import time
from pathlib import Path
from typing import Iterator
import pytest
from fixtures.neon_fixtures import (
PSQL,
NeonProxy,
)
from fixtures.port_distributor import PortDistributor
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.response import Response
def waiting_handler(status_code: int) -> Response:
# wait more than timeout to make sure that both (two) connections are open.
# It would be better to use a barrier here, but I don't know how to do that together with pytest-httpserver.
time.sleep(2)
return Response(status=status_code)
@pytest.fixture(scope="function")
def proxy_with_rate_limit(
port_distributor: PortDistributor,
neon_binpath: Path,
httpserver_listen_address,
test_output_dir: Path,
) -> Iterator[NeonProxy]:
"""Neon proxy that routes directly to vanilla postgres."""
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
http_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
with NeonProxy(
neon_binpath=neon_binpath,
test_output_dir=test_output_dir,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Console(endpoint, fixed_rate_limit=5),
) as proxy:
proxy.start()
yield proxy
@pytest.mark.asyncio
async def test_proxy_rate_limit(
httpserver: HTTPServer,
proxy_with_rate_limit: NeonProxy,
):
uri = "/billing/api/v1/usage_events/proxy_get_role_secret"
# mock control plane service
httpserver.expect_ordered_request(uri, method="GET").respond_with_handler(
lambda _: Response(status=200)
)
httpserver.expect_ordered_request(uri, method="GET").respond_with_handler(
lambda _: waiting_handler(429)
)
httpserver.expect_ordered_request(uri, method="GET").respond_with_handler(
lambda _: waiting_handler(500)
)
psql = PSQL(host=proxy_with_rate_limit.host, port=proxy_with_rate_limit.proxy_port)
f = await psql.run("select 42;")
await proxy_with_rate_limit.find_auth_link(uri, f)
# Limit should be 2.
# Run two queries in parallel.
f1, f2 = await asyncio.gather(psql.run("select 42;"), psql.run("select 42;"))
await proxy_with_rate_limit.find_auth_link(uri, f1)
await proxy_with_rate_limit.find_auth_link(uri, f2)
# Now limit should be 0.
f = await psql.run("select 42;")
await proxy_with_rate_limit.find_auth_link(uri, f)
# There last query shouldn't reach the http-server.
assert httpserver.assertions == []

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "261497dd63ace434045058b1453bcbaaa83f23e5",
"postgres-v15": "85d809c124a898847a97d66a211f7d5ef4f8e0cb",
"postgres-v14": "d9149dc59abcbeeb26293707509aef51752db28f"
"postgres-v16": "3946b2e2ea71d07af092099cb5bcae76a69b90d6",
"postgres-v15": "64b8c7bccc6b77e04795e2d4cf6ad82dc8d987ed",
"postgres-v14": "a7b4c66156bce00afa60e5592d4284ba9e40b4cf"
}