Compare commits

...

8 Commits

Author SHA1 Message Date
Konstantin Knizhnik
d05cefaae1 Fix mistyping 2025-06-12 08:07:17 +03:00
Konstantin Knizhnik
fad69227d6 Fix bug in neon_redo_read_buffer_filter 2025-06-11 20:20:54 +03:00
Konstantin Knizhnik
4a397638bf Prohibit partial redo of wal records: if record affects several pages then either all of them are reconstructed, either all skipped 2025-06-11 18:19:04 +03:00
Konstantin Knizhnik
b34648c136 Remove XLogWaitForReplayOf at replica to avoid deadlock 2025-06-11 17:56:25 +03:00
Konstantin Knizhnik
24038033bf Remove default from DROP FUNCTION (#12202)
## Problem

DROP FUNCTION doesn't allow to specify default for parameters.

## Summary of changes

Remove DEFAULT clause from pgxn/neon/neon--1.6--1.5.sql

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-06-11 13:16:58 +00:00
Mikhail
1b935b1958 endpoint_storage: add ?from_endpoint= to /lfc/prewarm (#12195)
Related: https://github.com/neondatabase/cloud/issues/24225
Add optional from_endpoint parameter to allow prewarming from other
endpoint
2025-06-10 19:25:32 +00:00
a-masterov
3f16ca2c18 Respect limits for projects for the Random Operations test (#12184)
## Problem
The project limits were not respected, resulting in errors.
## Summary of changes
Now limits are checked before running an action, and if the action is
not possible to run, another random action will be run.

---------

Co-authored-by: Peter Bendel <peterbendel@neon.tech>
2025-06-10 15:59:51 +00:00
Conrad Ludgate
67b94c5992 [proxy] per endpoint configuration for rate limits (#12148)
https://github.com/neondatabase/cloud/issues/28333

Adds a new `rate_limit` response type to EndpointAccessControl, uses it
for rate limiting, and adds a generic invalidation for the cache.
2025-06-10 14:26:08 +00:00
22 changed files with 272 additions and 112 deletions

16
Cargo.lock generated
View File

@@ -753,6 +753,7 @@ dependencies = [
"axum",
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"headers",
"http 1.1.0",
@@ -761,6 +762,8 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"serde_html_form",
"serde_path_to_error",
"tower 0.5.2",
"tower-layer",
"tower-service",
@@ -6422,6 +6425,19 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "serde_html_form"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
dependencies = [
"form_urlencoded",
"indexmap 2.9.0",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_json"
version = "1.0.125"

View File

@@ -71,7 +71,7 @@ aws-credential-types = "1.2.0"
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header"] }
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.71"

View File

@@ -785,7 +785,7 @@ impl ComputeNode {
self.spawn_extension_stats_task();
if pspec.spec.autoprewarm {
self.prewarm_lfc();
self.prewarm_lfc(None);
}
Ok(())
}

View File

@@ -25,11 +25,16 @@ struct EndpointStoragePair {
}
const KEY: &str = "lfc_state";
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
/// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec,
endpoint_id: Option<String>,
) -> Result<Self> {
let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
let Some(ref endpoint_id) = endpoint_id else {
bail!("pspec.endpoint_id missing, other endpoint_id not provided")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
@@ -84,7 +89,7 @@ impl ComputeNode {
}
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -97,7 +102,7 @@ impl ComputeNode {
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.prewarm_impl().await else {
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
@@ -109,13 +114,14 @@ impl ComputeNode {
true
}
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
/// from_endpoint: None for endpoint managed by this compute_ctl
fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().try_into()
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
}
async fn prewarm_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
@@ -173,7 +179,7 @@ impl ComputeNode {
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();

View File

@@ -2,6 +2,7 @@ use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery;
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
@@ -16,8 +17,16 @@ pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadS
Json(compute.lfc_offload_state())
}
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
if compute.prewarm_lfc() {
#[derive(serde::Deserialize)]
pub struct PrewarmQuery {
pub from_endpoint: String,
}
pub(in crate::http) async fn prewarm(
compute: Compute,
OptionalQuery(query): OptionalQuery<PrewarmQuery>,
) -> Response {
if compute.prewarm_lfc(query.map(|q| q.from_endpoint)) {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(

View File

@@ -2085,9 +2085,6 @@ communicator_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber ba
start_ts = GetCurrentTimestamp();
if (RecoveryInProgress() && MyBackendType != B_STARTUP)
XLogWaitForReplayOf(reqlsns->request_lsn);
/*
* Try to find prefetched page in the list of received pages.
*/

View File

@@ -2,6 +2,6 @@ DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer);

View File

@@ -2382,6 +2382,8 @@ get_fsm_physical_block(BlockNumber heapblk)
static bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
static XLogRecPtr last_recptr = InvalidXLogRecPtr;
static bool last_no_redo_needed;
XLogRecPtr end_recptr = record->EndRecPtr;
NRelFileInfo rinfo;
ForkNumber forknum;
@@ -2390,69 +2392,88 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
uint32 hash;
LWLock *partitionLock;
int buf_id;
bool no_redo_needed;
bool no_redo_needed = true;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;
tag.blockNum = blkno;
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers. See also
* this function's top comment.
*/
if (!OidIsValid(NInfoGetDbOid(rinfo)))
if (last_recptr != end_recptr)
{
no_redo_needed = false;
#if PG_VERSION_NUM < 150000
int max_block_id = record->max_block_id;
#else
int max_block_id = XLogRecMaxBlockId(record);
#endif
for (int block_id = 0; block_id <= max_block_id && no_redo_needed; block_id++)
{
if (XLogRecHasBlockRef(record, block_id))
{
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;
tag.blockNum = blkno;
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers. See also
* this function's top comment.
*/
if (!OidIsValid(NInfoGetDbOid(rinfo)))
{
no_redo_needed = false;
}
else
{
/* Try to find the relevant buffer */
buf_id = BufTableLookup(&tag, hash);
no_redo_needed = buf_id < 0;
}
/*
* we don't have the buffer in memory, update lwLsn past this record, also
* evict page from file cache
*/
if (no_redo_needed)
{
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
}
LWLockRelease(partitionLock);
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
if (forknum == MAIN_FORKNUM)
{
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
}
}
}
last_recptr = end_recptr;
last_no_redo_needed = no_redo_needed;
}
else
{
/* Try to find the relevant buffer */
buf_id = BufTableLookup(&tag, hash);
no_redo_needed = buf_id < 0;
}
/*
* we don't have the buffer in memory, update lwLsn past this record, also
* evict page from file cache
*/
if (no_redo_needed)
{
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
}
LWLockRelease(partitionLock);
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
if (forknum == MAIN_FORKNUM)
{
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
no_redo_needed = last_no_redo_needed;
}
return no_redo_needed;
}

View File

@@ -14,12 +14,13 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{debug, info};
use crate::auth::{self, AuthError, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
use crate::auth::{self, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
use crate::cache::Cached;
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::client::ControlPlaneClient;
use crate::control_plane::errors::GetAuthInfoError;
use crate::control_plane::messages::EndpointRateLimitConfig;
use crate::control_plane::{
self, AccessBlockerFlags, AuthSecret, CachedNodeInfo, ControlPlaneApi, EndpointAccessControl,
RoleAccessControl,
@@ -230,11 +231,8 @@ async fn auth_quirks(
config.is_vpc_acccess_proxy,
)?;
let endpoint = EndpointIdInt::from(&info.endpoint);
let rate_limit_config = None;
if !endpoint_rate_limiter.check(endpoint, rate_limit_config, 1) {
return Err(AuthError::too_many_connections());
}
access_controls.connection_attempt_rate_limit(ctx, &info.endpoint, &endpoint_rate_limiter)?;
let role_access = api
.get_role_access_control(ctx, &info.endpoint, &info.user)
.await?;
@@ -401,6 +399,7 @@ impl Backend<'_, ComputeUserInfo> {
allowed_ips: Arc::new(vec![]),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
}),
}
}
@@ -439,6 +438,7 @@ mod tests {
use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern};
use crate::config::AuthenticationConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::EndpointRateLimitConfig;
use crate::control_plane::{
self, AccessBlockerFlags, CachedNodeInfo, EndpointAccessControl, RoleAccessControl,
};
@@ -477,6 +477,7 @@ mod tests {
allowed_ips: Arc::new(self.ips.clone()),
allowed_vpce: Arc::new(self.vpc_endpoint_ids.clone()),
flags: self.access_blocker_flags,
rate_limits: EndpointRateLimitConfig::default(),
})
}

View File

@@ -364,6 +364,7 @@ mod tests {
use std::sync::Arc;
use super::*;
use crate::control_plane::messages::EndpointRateLimitConfig;
use crate::control_plane::{AccessBlockerFlags, AuthSecret};
use crate::scram::ServerSecret;
use crate::types::ProjectId;
@@ -399,6 +400,7 @@ mod tests {
allowed_ips: allowed_ips.clone(),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
},
RoleAccessControl {
secret: secret1.clone(),
@@ -414,6 +416,7 @@ mod tests {
allowed_ips: allowed_ips.clone(),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
},
RoleAccessControl {
secret: secret2.clone(),
@@ -439,6 +442,7 @@ mod tests {
allowed_ips: allowed_ips.clone(),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
},
RoleAccessControl {
secret: secret3.clone(),

View File

@@ -146,6 +146,7 @@ impl NeonControlPlaneClient {
public_access_blocked: block_public_connections,
vpc_access_blocked: block_vpc_connections,
},
rate_limits: body.rate_limits,
})
}
.inspect_err(|e| tracing::debug!(error = ?e))
@@ -312,6 +313,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
allowed_ips: Arc::new(auth_info.allowed_ips),
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
flags: auth_info.access_blocker_flags,
rate_limits: auth_info.rate_limits,
};
let role_control = RoleAccessControl {
secret: auth_info.secret,
@@ -357,6 +359,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
allowed_ips: Arc::new(auth_info.allowed_ips),
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
flags: auth_info.access_blocker_flags,
rate_limits: auth_info.rate_limits,
};
let role_control = RoleAccessControl {
secret: auth_info.secret,

View File

@@ -20,7 +20,7 @@ use crate::context::RequestContext;
use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
};
use crate::control_plane::messages::MetricsAuxInfo;
use crate::control_plane::messages::{EndpointRateLimitConfig, MetricsAuxInfo};
use crate::control_plane::{
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
RoleAccessControl,
@@ -130,6 +130,7 @@ impl MockControlPlane {
project_id: None,
account_id: None,
access_blocker_flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
})
}
@@ -233,6 +234,7 @@ impl super::ControlPlaneApi for MockControlPlane {
allowed_ips: Arc::new(info.allowed_ips),
allowed_vpce: Arc::new(info.allowed_vpc_endpoint_ids),
flags: info.access_blocker_flags,
rate_limits: info.rate_limits,
})
}

View File

@@ -10,6 +10,7 @@ use clashmap::ClashMap;
use tokio::time::Instant;
use tracing::{debug, info};
use super::{EndpointAccessControl, RoleAccessControl};
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError};
use crate::cache::endpoints::EndpointsCache;
@@ -22,8 +23,6 @@ use crate::metrics::ApiLockMetrics;
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
use crate::types::EndpointId;
use super::{EndpointAccessControl, RoleAccessControl};
#[non_exhaustive]
#[derive(Clone)]
pub enum ControlPlaneClient {

View File

@@ -227,12 +227,35 @@ pub(crate) struct UserFacingMessage {
#[derive(Deserialize)]
pub(crate) struct GetEndpointAccessControl {
pub(crate) role_secret: Box<str>,
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
pub(crate) project_id: Option<ProjectIdInt>,
pub(crate) account_id: Option<AccountIdInt>,
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
pub(crate) block_public_connections: Option<bool>,
pub(crate) block_vpc_connections: Option<bool>,
#[serde(default)]
pub(crate) rate_limits: EndpointRateLimitConfig,
}
#[derive(Copy, Clone, Deserialize, Default)]
pub struct EndpointRateLimitConfig {
pub connection_attempts: ConnectionAttemptsLimit,
}
#[derive(Copy, Clone, Deserialize, Default)]
pub struct ConnectionAttemptsLimit {
pub tcp: Option<LeakyBucketSetting>,
pub ws: Option<LeakyBucketSetting>,
pub http: Option<LeakyBucketSetting>,
}
#[derive(Copy, Clone, Deserialize)]
pub struct LeakyBucketSetting {
pub rps: f64,
pub burst: f64,
}
/// Response which holds compute node's `host:port` pair.

View File

@@ -11,6 +11,8 @@ pub(crate) mod errors;
use std::sync::Arc;
use messages::EndpointRateLimitConfig;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::auth::{AuthError, IpPattern, check_peer_addr_is_in_list};
@@ -18,8 +20,9 @@ use crate::cache::{Cached, TimedLru};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
use crate::intern::{AccountIdInt, ProjectIdInt};
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt};
use crate::protocol2::ConnectionInfoExtra;
use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig};
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::{compute, scram};
@@ -56,6 +59,8 @@ pub(crate) struct AuthInfo {
pub(crate) account_id: Option<AccountIdInt>,
/// Are public connections or VPC connections blocked?
pub(crate) access_blocker_flags: AccessBlockerFlags,
/// The rate limits for this endpoint.
pub(crate) rate_limits: EndpointRateLimitConfig,
}
/// Info for establishing a connection to a compute node.
@@ -101,6 +106,8 @@ pub struct EndpointAccessControl {
pub allowed_ips: Arc<Vec<IpPattern>>,
pub allowed_vpce: Arc<Vec<String>>,
pub flags: AccessBlockerFlags,
pub rate_limits: EndpointRateLimitConfig,
}
impl EndpointAccessControl {
@@ -139,6 +146,36 @@ impl EndpointAccessControl {
Ok(())
}
pub fn connection_attempt_rate_limit(
&self,
ctx: &RequestContext,
endpoint: &EndpointId,
rate_limiter: &EndpointRateLimiter,
) -> Result<(), AuthError> {
let endpoint = EndpointIdInt::from(endpoint);
let limits = &self.rate_limits.connection_attempts;
let config = match ctx.protocol() {
crate::metrics::Protocol::Http => limits.http,
crate::metrics::Protocol::Ws => limits.ws,
crate::metrics::Protocol::Tcp => limits.tcp,
crate::metrics::Protocol::SniRouter => return Ok(()),
};
let config = config.and_then(|config| {
if config.rps <= 0.0 || config.burst <= 0.0 {
return None;
}
Some(LeakyBucketConfig::new(config.rps, config.burst))
});
if !rate_limiter.check(endpoint, config, 1) {
return Err(AuthError::too_many_connections());
}
Ok(())
}
}
/// This will allocate per each call, but the http requests alone

View File

@@ -69,9 +69,8 @@ pub struct LeakyBucketConfig {
pub max: f64,
}
#[cfg(test)]
impl LeakyBucketConfig {
pub(crate) fn new(rps: f64, max: f64) -> Self {
pub fn new(rps: f64, max: f64) -> Self {
assert!(rps > 0.0, "rps must be positive");
assert!(max > 0.0, "max must be positive");
Self { rps, max }

View File

@@ -12,11 +12,10 @@ use rand::{Rng, SeedableRng};
use tokio::time::{Duration, Instant};
use tracing::info;
use super::LeakyBucketConfig;
use crate::ext::LockExt;
use crate::intern::EndpointIdInt;
use super::LeakyBucketConfig;
pub struct GlobalRateLimiter {
data: Vec<RateBucket>,
info: Vec<RateBucketInfo>,

View File

@@ -68,17 +68,20 @@ impl PoolingBackend {
self.config.authentication_config.is_vpc_acccess_proxy,
)?;
let ep = EndpointIdInt::from(&user_info.endpoint);
let rate_limit_config = None;
if !self.endpoint_rate_limiter.check(ep, rate_limit_config, 1) {
return Err(AuthError::too_many_connections());
}
access_control.connection_attempt_rate_limit(
ctx,
&user_info.endpoint,
&self.endpoint_rate_limiter,
)?;
let role_access = backend.get_role_secret(ctx).await?;
let Some(secret) = role_access.secret else {
// If we don't have an authentication secret, for the http flow we can just return an error.
info!("authentication info not found");
return Err(AuthError::password_failed(&*user_info.user));
};
let ep = EndpointIdInt::from(&user_info.endpoint);
let auth_outcome = crate::auth::validate_password_and_exchange(
&self.config.authentication_config.thread_pool,
ep,

View File

@@ -69,8 +69,10 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def prewarm_lfc(self):
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
def prewarm_lfc(self, from_endpoint_id: str | None = None):
url: str = f"http://localhost:{self.external_port}/lfc/prewarm"
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
self.post(url, params=params).raise_for_status()
def prewarmed():
json = self.prewarm_lfc_status()

View File

@@ -129,6 +129,18 @@ class NeonAPI:
return cast("dict[str, Any]", resp.json())
def get_project_limits(self, project_id: str) -> dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/limits",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def delete_project(
self,
project_id: str,

View File

@@ -45,6 +45,8 @@ class NeonEndpoint:
if self.branch.connect_env:
self.connect_env = self.branch.connect_env.copy()
self.connect_env["PGHOST"] = self.host
if self.type == "read_only":
self.project.read_only_endpoints_total += 1
def delete(self):
self.project.delete_endpoint(self.id)
@@ -228,8 +230,13 @@ class NeonProject:
self.benchmarks: dict[str, subprocess.Popen[Any]] = {}
self.restore_num: int = 0
self.restart_pgbench_on_console_errors: bool = False
self.limits: dict[str, Any] = self.get_limits()["limits"]
self.read_only_endpoints_total: int = 0
def delete(self):
def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id)
def delete(self) -> None:
self.neon_api.delete_project(self.id)
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
@@ -282,6 +289,7 @@ class NeonProject:
self.neon_api.delete_endpoint(self.id, endpoint_id)
self.endpoints[endpoint_id].branch.endpoints.pop(endpoint_id)
self.endpoints.pop(endpoint_id)
self.read_only_endpoints_total -= 1
self.wait()
def start_benchmark(self, target: str, clients: int = 10) -> subprocess.Popen[Any]:
@@ -369,49 +377,64 @@ def setup_class(
print(f"::warning::Retried on 524 error {neon_api.retries524} times")
if neon_api.retries4xx > 0:
print(f"::warning::Retried on 4xx error {neon_api.retries4xx} times")
log.info("Removing the project")
log.info("Removing the project %s", project.id)
project.delete()
def do_action(project: NeonProject, action: str) -> None:
def do_action(project: NeonProject, action: str) -> bool:
"""
Runs the action
"""
log.info("Action: %s", action)
if action == "new_branch":
log.info("Trying to create a new branch")
if 0 <= project.limits["max_branches"] <= len(project.branches):
log.info(
"Maximum branch limit exceeded (%s of %s)",
len(project.branches),
project.limits["max_branches"],
)
return False
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
log.info("Parent: %s", parent)
child = parent.create_child_branch()
if child is None:
return
return False
log.info("Created branch %s", child)
child.start_benchmark()
elif action == "delete_branch":
if project.leaf_branches:
target = random.choice(list(project.leaf_branches.values()))
target: NeonBranch = random.choice(list(project.leaf_branches.values()))
log.info("Trying to delete branch %s", target)
target.delete()
else:
log.info("Leaf branches not found, skipping")
return False
elif action == "new_ro_endpoint":
if 0 <= project.limits["max_read_only_endpoints"] <= project.read_only_endpoints_total:
log.info(
"Maximum read only endpoint limit exceeded (%s of %s)",
project.read_only_endpoints_total,
project.limits["max_read_only_endpoints"],
)
return False
ep = random.choice(
[br for br in project.branches.values() if br.id not in project.reset_branches]
).create_ro_endpoint()
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
ep.start_benchmark()
elif action == "delete_ro_endpoint":
if project.read_only_endpoints_total == 0:
log.info("no read_only endpoints present, skipping")
return False
ro_endpoints: list[NeonEndpoint] = [
endpoint for endpoint in project.endpoints.values() if endpoint.type == "read_only"
]
if ro_endpoints:
target_ep: NeonEndpoint = random.choice(ro_endpoints)
target_ep.delete()
log.info("endpoint %s deleted", target_ep.id)
else:
log.info("no read_only endpoints present, skipping")
target_ep: NeonEndpoint = random.choice(ro_endpoints)
target_ep.delete()
log.info("endpoint %s deleted", target_ep.id)
elif action == "restore_random_time":
if project.leaf_branches:
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
@@ -419,8 +442,10 @@ def do_action(project: NeonProject, action: str) -> None:
br.restore_random_time()
else:
log.info("No leaf branches found")
return False
else:
raise ValueError(f"The action {action} is unknown")
return True
@pytest.mark.timeout(7200)
@@ -457,8 +482,9 @@ def test_api_random(
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
for _ in range(num_operations):
log.info("Starting action #%s", _ + 1)
do_action(
while not do_action(
project, random.choices([a[0] for a in ACTIONS], weights=[w[1] for w in ACTIONS])[0]
)
):
log.info("Retrying...")
project.check_all_benchmarks()
assert True

View File

@@ -188,7 +188,8 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet
pg_cur.execute("select pg_reload_conf()")
if query is LfcQueryMethod.COMPUTE_CTL:
http_client.prewarm_lfc()
# Same thing as prewarm_lfc(), testing other method
http_client.prewarm_lfc(endpoint.endpoint_id)
else:
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))