mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 19:10:38 +00:00
improve proxy code cov (#6141)
## Summary of changes saw some low-hanging codecov improvements. even if code coverage is somewhat of a pointless game, might as well add tests where we can and delete code if it's unused
This commit is contained in:
@@ -33,39 +33,6 @@ impl Aimd {
|
||||
min_utilisation_threshold: config.aimd_min_utilisation_threshold,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decrease_factor(self, factor: f32) -> Self {
|
||||
assert!((0.5..1.0).contains(&factor));
|
||||
Self {
|
||||
decrease_factor: factor,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increase_by(self, increase: usize) -> Self {
|
||||
assert!(increase > 0);
|
||||
Self {
|
||||
increase_by: increase,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_max_limit(self, max: usize) -> Self {
|
||||
assert!(max > 0);
|
||||
Self {
|
||||
max_limit: max,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// A threshold below which the limit won't be increased. 0.5 = 50%.
|
||||
pub fn with_min_utilisation_threshold(self, min_util: f32) -> Self {
|
||||
assert!(min_util > 0. && min_util < 1.);
|
||||
Self {
|
||||
min_utilisation_threshold: min_util,
|
||||
..self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
use std::{
|
||||
collections::hash_map::RandomState,
|
||||
hash::BuildHasher,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc, Mutex,
|
||||
},
|
||||
};
|
||||
|
||||
use anyhow::bail;
|
||||
use dashmap::DashMap;
|
||||
use itertools::Itertools;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rand::{rngs::StdRng, Rng, SeedableRng};
|
||||
use smol_str::SmolStr;
|
||||
use tokio::sync::{Mutex as AsyncMutex, Semaphore, SemaphorePermit};
|
||||
use tokio::time::{timeout, Duration, Instant};
|
||||
@@ -28,10 +32,11 @@ use super::{
|
||||
// saw SNI, before doing TLS handshake. User-side error messages in that case
|
||||
// does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now
|
||||
// I went with a more expensive way that yields user-friendlier error messages.
|
||||
pub struct EndpointRateLimiter {
|
||||
map: DashMap<SmolStr, Vec<RateBucket>>,
|
||||
pub struct EndpointRateLimiter<Rand = StdRng, Hasher = RandomState> {
|
||||
map: DashMap<SmolStr, Vec<RateBucket>, Hasher>,
|
||||
info: &'static [RateBucketInfo],
|
||||
access_count: AtomicUsize,
|
||||
rand: Mutex<Rand>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -125,11 +130,18 @@ impl RateBucketInfo {
|
||||
|
||||
impl EndpointRateLimiter {
|
||||
pub fn new(info: &'static [RateBucketInfo]) -> Self {
|
||||
Self::new_with_rand_and_hasher(info, StdRng::from_entropy(), RandomState::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
|
||||
fn new_with_rand_and_hasher(info: &'static [RateBucketInfo], rand: R, hasher: S) -> Self {
|
||||
info!(buckets = ?info, "endpoint rate limiter");
|
||||
Self {
|
||||
info,
|
||||
map: DashMap::with_shard_amount(64),
|
||||
map: DashMap::with_hasher_and_shard_amount(hasher, 64),
|
||||
access_count: AtomicUsize::new(1), // start from 1 to avoid GC on the first request
|
||||
rand: Mutex::new(rand),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,7 +188,9 @@ impl EndpointRateLimiter {
|
||||
self.map.len()
|
||||
);
|
||||
let n = self.map.shards().len();
|
||||
let shard = thread_rng().gen_range(0..n);
|
||||
// this lock is ok as the periodic cycle of do_gc makes this very unlikely to collide
|
||||
// (impossible, infact, unless we have 2048 threads)
|
||||
let shard = self.rand.lock().unwrap().gen_range(0..n);
|
||||
self.map.shards()[shard].write().clear();
|
||||
}
|
||||
}
|
||||
@@ -219,7 +233,6 @@ pub struct Token<'t> {
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct LimiterState {
|
||||
limit: usize,
|
||||
available: usize,
|
||||
in_flight: usize,
|
||||
}
|
||||
|
||||
@@ -397,11 +410,7 @@ impl Limiter {
|
||||
pub fn state(&self) -> LimiterState {
|
||||
let limit = self.limits.load(Ordering::Relaxed);
|
||||
let in_flight = self.in_flight.load(Ordering::Relaxed);
|
||||
LimiterState {
|
||||
limit,
|
||||
available: limit.saturating_sub(in_flight),
|
||||
in_flight,
|
||||
}
|
||||
LimiterState { limit, in_flight }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -414,13 +423,6 @@ impl<'t> Token<'t> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn set_latency(&mut self, latency: Duration) {
|
||||
use std::ops::Sub;
|
||||
|
||||
self.start = Instant::now().sub(latency);
|
||||
}
|
||||
|
||||
pub fn forget(&mut self) {
|
||||
if let Some(permit) = self.permit.take() {
|
||||
permit.forget();
|
||||
@@ -439,10 +441,6 @@ impl LimiterState {
|
||||
pub fn limit(&self) -> usize {
|
||||
self.limit
|
||||
}
|
||||
/// The amount of concurrency available to use.
|
||||
pub fn available(&self) -> usize {
|
||||
self.available
|
||||
}
|
||||
/// The number of jobs in flight.
|
||||
pub fn in_flight(&self) -> usize {
|
||||
self.in_flight
|
||||
@@ -490,9 +488,11 @@ impl reqwest_middleware::Middleware for Limiter {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{pin::pin, task::Context, time::Duration};
|
||||
use std::{hash::BuildHasherDefault, pin::pin, task::Context, time::Duration};
|
||||
|
||||
use futures::{task::noop_waker_ref, Future};
|
||||
use rand::SeedableRng;
|
||||
use rustc_hash::FxHasher;
|
||||
use smol_str::SmolStr;
|
||||
use tokio::time;
|
||||
|
||||
@@ -690,4 +690,21 @@ mod tests {
|
||||
assert!(limiter.check(endpoint.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rate_limits_gc() {
|
||||
// fixed seeded random/hasher to ensure that the test is not flaky
|
||||
let rand = rand::rngs::StdRng::from_seed([1; 32]);
|
||||
let hasher = BuildHasherDefault::<FxHasher>::default();
|
||||
|
||||
let limiter = EndpointRateLimiter::new_with_rand_and_hasher(
|
||||
&RateBucketInfo::DEFAULT_SET,
|
||||
rand,
|
||||
hasher,
|
||||
);
|
||||
for i in 0..1_000_000 {
|
||||
limiter.check(format!("{i}").into());
|
||||
}
|
||||
assert!(limiter.map.len() < 150_000);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,15 +27,15 @@ use sync_wrapper::SyncWrapper;
|
||||
pin_project! {
|
||||
/// This is a wrapper around a [`WebSocketStream`] that
|
||||
/// implements [`AsyncRead`] and [`AsyncWrite`].
|
||||
pub struct WebSocketRw {
|
||||
pub struct WebSocketRw<S = Upgraded> {
|
||||
#[pin]
|
||||
stream: SyncWrapper<WebSocketStream<Upgraded>>,
|
||||
stream: SyncWrapper<WebSocketStream<S>>,
|
||||
bytes: Bytes,
|
||||
}
|
||||
}
|
||||
|
||||
impl WebSocketRw {
|
||||
pub fn new(stream: WebSocketStream<Upgraded>) -> Self {
|
||||
impl<S> WebSocketRw<S> {
|
||||
pub fn new(stream: WebSocketStream<S>) -> Self {
|
||||
Self {
|
||||
stream: stream.into(),
|
||||
bytes: Bytes::new(),
|
||||
@@ -43,7 +43,7 @@ impl WebSocketRw {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for WebSocketRw {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
@@ -69,7 +69,7 @@ impl AsyncWrite for WebSocketRw {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for WebSocketRw {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for WebSocketRw<S> {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
@@ -86,7 +86,7 @@ impl AsyncRead for WebSocketRw {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncBufRead for WebSocketRw {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
|
||||
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
|
||||
// Please refer to poll_fill_buf's documentation.
|
||||
const EOF: Poll<io::Result<&[u8]>> = Poll::Ready(Ok(&[]));
|
||||
@@ -151,3 +151,60 @@ pub async fn serve_websocket(
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::pin::pin;
|
||||
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use hyper_tungstenite::{
|
||||
tungstenite::{protocol::Role, Message},
|
||||
WebSocketStream,
|
||||
};
|
||||
use tokio::{
|
||||
io::{duplex, AsyncReadExt, AsyncWriteExt},
|
||||
task::JoinSet,
|
||||
};
|
||||
|
||||
use super::WebSocketRw;
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_stream_wrapper_happy_path() {
|
||||
let (stream1, stream2) = duplex(1024);
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
|
||||
js.spawn(async move {
|
||||
let mut client = WebSocketStream::from_raw_socket(stream1, Role::Client, None).await;
|
||||
|
||||
client
|
||||
.send(Message::Binary(b"hello world".to_vec()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let message = client.next().await.unwrap().unwrap();
|
||||
assert_eq!(message, Message::Binary(b"websockets are cool".to_vec()));
|
||||
|
||||
client.close(None).await.unwrap();
|
||||
});
|
||||
|
||||
js.spawn(async move {
|
||||
let mut rw = pin!(WebSocketRw::new(
|
||||
WebSocketStream::from_raw_socket(stream2, Role::Server, None).await
|
||||
));
|
||||
|
||||
let mut buf = vec![0; 1024];
|
||||
let n = rw.read(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf[..n], b"hello world");
|
||||
|
||||
rw.write_all(b"websockets are cool").await.unwrap();
|
||||
rw.flush().await.unwrap();
|
||||
|
||||
let n = rw.read_to_end(&mut buf).await.unwrap();
|
||||
assert_eq!(n, 0);
|
||||
});
|
||||
|
||||
js.join_next().await.unwrap().unwrap();
|
||||
js.join_next().await.unwrap().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user