Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
0579533ac8 remove dynamic limiter internal Box<dyn> 2025-04-02 12:45:04 +01:00
Conrad Ludgate
99c0da9607 proxy: simplify dynamic limiter impl 2025-04-02 12:40:45 +01:00
4 changed files with 42 additions and 65 deletions

View File

@@ -369,7 +369,7 @@ FROM build-deps AS plv8-src
ARG PG_VERSION
WORKDIR /ext-src
COPY compute/patches/plv8* .
COPY compute/patches/plv8-3.1.10.patch .
# plv8 3.2.3 supports v17
# last release v3.2.3 - Sep 7, 2024
@@ -393,7 +393,7 @@ RUN case "${PG_VERSION:?}" in \
git clone --recurse-submodules --depth 1 --branch ${PLV8_TAG} https://github.com/plv8/plv8.git plv8-src && \
tar -czf plv8.tar.gz --exclude .git plv8-src && \
cd plv8-src && \
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8_v3.1.10.patch; else patch -p1 < /ext-src/plv8_v3.2.3.patch; fi
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8-3.1.10.patch; fi
# Step 1: Build the vendored V8 engine. It doesn't depend on PostgreSQL, so use
# 'build-deps' as the base. This enables caching and avoids unnecessary rebuilds.

View File

@@ -1,6 +1,12 @@
commit 46b38d3e46f9cd6c70d9b189dd6ff4abaa17cf5e
Author: Alexander Bayandin <alexander@neon.tech>
Date: Sat Nov 30 18:29:32 2024 +0000
Fix v8 9.7.37 compilation on Debian 12
diff --git a/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch b/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch
new file mode 100644
index 0000000..fae1cb3
index 0000000..f0a5dc7
--- /dev/null
+++ b/patches/code/84cf3230a9680aac3b73c410c2b758760b6d3066.patch
@@ -0,0 +1,30 @@
@@ -29,21 +35,8 @@ index 0000000..fae1cb3
+@@ -5,6 +5,7 @@
+ #ifndef V8_HEAP_CPPGC_PREFINALIZER_HANDLER_H_
+ #define V8_HEAP_CPPGC_PREFINALIZER_HANDLER_H_
+
+
++#include <utility>
+ #include <vector>
+
+
+ #include "include/cppgc/prefinalizer.h"
diff --git a/plv8.cc b/plv8.cc
index c1ce883..6e47e94 100644
--- a/plv8.cc
+++ b/plv8.cc
@@ -379,7 +379,7 @@ _PG_init(void)
NULL,
&plv8_v8_flags,
NULL,
- PGC_USERSET, 0,
+ PGC_SUSET, 0,
#if PG_VERSION_NUM >= 90100
NULL,
#endif

View File

@@ -1,13 +0,0 @@
diff --git a/plv8.cc b/plv8.cc
index edfa2aa..623e7f2 100644
--- a/plv8.cc
+++ b/plv8.cc
@@ -385,7 +385,7 @@ _PG_init(void)
NULL,
&plv8_v8_flags,
NULL,
- PGC_USERSET, 0,
+ PGC_SUSET, 0,
#if PG_VERSION_NUM >= 90100
NULL,
#endif

View File

@@ -65,23 +65,13 @@ pub struct RateLimiterConfig {
pub(crate) initial_limit: usize,
}
impl RateLimiterConfig {
pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
match self.algorithm {
RateLimitAlgorithm::Fixed => Box::new(Fixed),
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
}
}
}
pub(crate) struct LimiterInner {
alg: Box<dyn LimitAlgorithm>,
available: usize,
pub(crate) struct LimiterInner<L: ?Sized> {
limit: usize,
in_flight: usize,
alg: L,
}
impl LimiterInner {
impl<L: ?Sized + LimitAlgorithm> LimiterInner<L> {
fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
if let Some(outcome) = outcome {
let sample = Sample {
@@ -94,14 +84,13 @@ impl LimiterInner {
}
fn take(&mut self, ready: &Notify) -> Option<()> {
if self.available >= 1 {
self.available -= 1;
if self.in_flight < self.limit {
self.in_flight += 1;
// tell the next in the queue that there is a permit ready
if self.available >= 1 {
if self.in_flight < self.limit {
ready.notify_one();
}
Some(())
} else {
None
@@ -116,11 +105,11 @@ impl LimiterInner {
///
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
/// caused by overload (loss).
pub(crate) struct DynamicLimiter {
config: RateLimiterConfig,
inner: Mutex<LimiterInner>,
pub(crate) struct DynamicLimiter<L: ?Sized = dyn LimitAlgorithm> {
disabled: bool,
// to notify when a token is available
ready: Notify,
inner: Mutex<LimiterInner<L>>,
}
/// A concurrency token, required to run a job.
@@ -140,37 +129,46 @@ struct LimiterState {
limit: usize,
}
impl DynamicLimiter {
/// Create a limiter with a given limit control algorithm.
pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
impl<L> DynamicLimiter<L> {
pub(crate) fn new_inner(initial_limit: usize, alg: L) -> Arc<Self> {
let ready = Notify::new();
ready.notify_one();
Arc::new(Self {
disabled: initial_limit == 0,
inner: Mutex::new(LimiterInner {
alg: config.create_rate_limit_algorithm(),
available: config.initial_limit,
limit: config.initial_limit,
limit: initial_limit,
in_flight: 0,
alg,
}),
ready,
config,
})
}
}
impl DynamicLimiter {
/// Create a limiter with a given limit control algorithm.
pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
let initial_limit = config.initial_limit;
match config.algorithm {
RateLimitAlgorithm::Fixed => DynamicLimiter::new_inner(initial_limit, Fixed),
RateLimitAlgorithm::Aimd { conf } => DynamicLimiter::new_inner(initial_limit, conf),
}
}
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
pub(crate) async fn acquire_timeout(
self: &Arc<Self>,
duration: Duration,
) -> Result<Token, Elapsed> {
tokio::time::timeout(duration, self.acquire()).await?
tokio::time::timeout(duration, self.acquire()).await
}
/// Try to acquire a concurrency [Token].
async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
if self.config.initial_limit == 0 {
async fn acquire(self: &Arc<Self>) -> Token {
if self.disabled {
// If the rate limiter is disabled, we can always acquire a token.
Ok(Token::disabled())
Token::disabled()
} else {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
@@ -178,7 +176,7 @@ impl DynamicLimiter {
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Ok(Token::new(self.clone()));
break Token::new(self.clone());
}
notified.set(self.ready.notified());
}
@@ -200,7 +198,7 @@ impl DynamicLimiter {
} else {
tracing::debug!("outcome is {:?}", outcome);
}
if self.config.initial_limit == 0 {
if self.disabled {
return;
}
@@ -210,7 +208,6 @@ impl DynamicLimiter {
inner.in_flight -= 1;
if inner.in_flight < inner.limit {
inner.available = inner.limit - inner.in_flight;
// At least 1 permit is now available
self.ready.notify_one();
}