mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
proxy: improve test performance (#8863)
Some tests were very slow and some tests occasionally stalled. This PR improves some test performance and replaces the custom threadpool in order to fix the stalling of tests.
This commit is contained in:
@@ -500,6 +500,7 @@ mod tests {
|
||||
use hyper1::service::service_fn;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use rand::rngs::OsRng;
|
||||
use rsa::pkcs8::DecodePrivateKey;
|
||||
use signature::Signer;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
@@ -517,8 +518,8 @@ mod tests {
|
||||
(sk, jwk)
|
||||
}
|
||||
|
||||
fn new_rsa_jwk(kid: String) -> (rsa::RsaPrivateKey, jose_jwk::Jwk) {
|
||||
let sk = rsa::RsaPrivateKey::new(&mut OsRng, 2048).unwrap();
|
||||
fn new_rsa_jwk(key: &str, kid: String) -> (rsa::RsaPrivateKey, jose_jwk::Jwk) {
|
||||
let sk = rsa::RsaPrivateKey::from_pkcs8_pem(key).unwrap();
|
||||
let pk = sk.to_public_key().into();
|
||||
let jwk = jose_jwk::Jwk {
|
||||
key: jose_jwk::Key::Rsa(pk),
|
||||
@@ -569,10 +570,70 @@ mod tests {
|
||||
format!("{payload}.{sig}")
|
||||
}
|
||||
|
||||
// RSA key gen is slow....
|
||||
const RS1: &str = "-----BEGIN PRIVATE KEY-----
|
||||
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDNuWBIWTlo+54Y
|
||||
aifpGInIrpv6LlsbI/2/2CC81Arlx4RsABORklgA9XSGwaCbHTshHsfd1S916JwA
|
||||
SpjyPQYWfqo6iAV8a4MhjIeJIkRr74prDCSzOGZvIc6VaGeCIb9clf3HSrPHm3hA
|
||||
cfLMB8/p5MgoxERPDOIn3XYoS9SEEuP7l0LkmEZMerg6W6lDjQRDny0Lb50Jky9X
|
||||
mDqnYXBhs99ranbwL5vjy0ba6OIeCWFJme5u+rv5C/P0BOYrJfGxIcEoKa8Ukw5s
|
||||
PlM+qrz9ope1eOuXMNNdyFDReNBUyaM1AwBAayU5rz57crer7K/UIofaJ42T4cMM
|
||||
nx/SWfBNAgMBAAECggEACqdpBxYn1PoC6/zDaFzu9celKEWyTiuE/qRwvZa1ocS9
|
||||
ZOJ0IPvVNud/S2NHsADJiSOQ8joSJScQvSsf1Ju4bv3MTw+wSQtAVUJz2nQ92uEi
|
||||
5/xPAkEPfP3hNvebNLAOuvrBk8qYmOPCTIQaMNrOt6wzeXkAmJ9wLuRXNCsJLHW+
|
||||
KLpf2WdgTYxqK06ZiJERFgJ2r1MsC2IgTydzjOAdEIrtMarerTLqqCpwFrk/l0cz
|
||||
1O2OAb17ZxmhuzMhjNMin81c8F2fZAGMeOjn92Jl5kUsYw/pG+0S8QKlbveR/fdP
|
||||
We2tJsgXw2zD0q7OJpp8NXS2yddrZGyysYsof983wQKBgQD2McqNJqo+eWL5zony
|
||||
UbL19loYw0M15EjhzIuzW1Jk0rPj65yQyzpJ6pqicRuWr34MvzCx+ZHM2b3jSiNu
|
||||
GES2fnC7xLIKyeRxfqsXF71xz+6UStEGRQX27r1YWEtyQVuBhvlqB+AGWP3PYAC+
|
||||
HecZecnZ+vcihJ2K3+l5O3paVQKBgQDV6vKH5h2SY9vgO8obx0P7XSS+djHhmPuU
|
||||
f8C/Fq6AuRbIA1g04pzuLU2WS9T26eIjgM173uVNg2TuqJveWzz+CAAp6nCR6l24
|
||||
DBg49lMGCWrMo4FqPG46QkUqvK8uSj42GkX/e5Rut1Gyu0209emeM6h2d2K15SvY
|
||||
9563tYSmGQKBgQDwcH5WTi20KA7e07TroJi8GKWzS3gneNUpGQBS4VxdtV4UuXXF
|
||||
/4TkzafJ/9cm2iurvUmMd6XKP9lw0mY5zp/E70WgTCBp4vUlVsU3H2tYbO+filYL
|
||||
3ntNx6nKTykX4/a/UJfj0t8as+zli+gNxNx/h+734V9dKdFG4Rl+2fTLpQKBgQCE
|
||||
qJkTEe+Q0wCOBEYICADupwqcWqwAXWDW7IrZdfVtulqYWwqecVIkmk+dPxWosc4d
|
||||
ekjz4nyNH0i+gC15LVebqdaAJ/T7aD4KXuW+nXNLMRfcJCGjgipRUruWD0EMEdqW
|
||||
rqBuGXMpXeH6VxGPgVkJVLvKC6tZZe9VM+pnvteuMQKBgQC8GaL+Lz+al4biyZBf
|
||||
JE8ekWrIotq/gfUBLP7x70+PB9bNtXtlgmTvjgYg4jiu3KR/ZIYYQ8vfVgkb6tDI
|
||||
rWGZw86Pzuoi1ppg/pYhKk9qrmCIT4HPEXbHl7ATahu2BOCIU3hybjTh2lB6LbX9
|
||||
8LMFlz1QPqSZYN/A/kOcLBfa3A==
|
||||
-----END PRIVATE KEY-----
|
||||
";
|
||||
const RS2: &str = "-----BEGIN PRIVATE KEY-----
|
||||
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDipm6FIKSRab3J
|
||||
HwmK18t7hp+pohllxIDUSPi7S5mIhN/JG2Plq2Lp746E/fuT8dcBF2R4sJlG2L0J
|
||||
zmxOvBU/i/sQF9s1i4CEfg05k2//gKENIEsF3pMMmrH+mcZi0TTD6rezHpdVxPHk
|
||||
qWxSyOCtIJV29X+wxPwAB59kQFHzy2ooPB1isZcpE8tO0KthAM+oZ3KuCwE0++cO
|
||||
IWLeq9aPwyKhtip/xjTMxd1kzdKh592mGSyzr9D0QSWOYFGvgJXANDdiPdhSSOLt
|
||||
ECWPNPlm2FQvGGvYYBafUqz7VumKHE6x8J6lKdYa2J0ZdDzCIo2IHzlxe+RZNgwy
|
||||
uAD2jhVxAgMBAAECggEAbsZHWBu3MzcKQiVARbLoygvnN0J5xUqAaMDtiKUPejDv
|
||||
K1yOu67DXnDuKEP2VL2rhuYG/hHaKE1AP227c9PrUq6424m9YvM2sgrlrdFIuQkG
|
||||
LeMtp8W7+zoUasp/ssZrUqICfLIj5xCl5UuFHQT/Ar7dLlIYwa3VOLKBDb9+Dnfe
|
||||
QH5/So4uMXG6vw34JN9jf+eAc8Yt0PeIz62ycvRwdpTJQ0MxZN9ZKpCAQp+VTuXT
|
||||
zlzNvDMilabEdqUvAyGyz8lBLNl0wdaVrqPqAEWM5U45QXsdFZknWammP7/tijeX
|
||||
0z+Bi0J0uSEU5X502zm7GArj/NNIiWMcjmDjwUUhwQKBgQD9C2GoqxOxuVPYqwYR
|
||||
+Jz7f2qMjlSP8adA5Lzuh8UKXDp8JCEQC8ryweLzaOKS9C5MAw+W4W2wd4nJoQI1
|
||||
P1dgGvBlfvEeRHMgqWtq7FuTsjSe7e0uSEkC4ngDb4sc0QOpv15cMuEz+4+aFLPL
|
||||
x29EcHWAaBX+rkid3zpQHFU4eQKBgQDlTCEqRuXwwa3V+Sq+mNWzD9QIGtD87TH/
|
||||
FPO/Ij/cK2+GISgFDqhetiGTH4qrvPL0psPT+iH5zGFYcoFmTtwLdWQJdxhxz0bg
|
||||
iX/AceyX5e1Bm+ThT36sU83NrxKPkrdk6jNmr2iUF1OTzTwUKOYdHOPZqdMPfF4M
|
||||
4XAaWVT2uQKBgQD4nKcNdU+7LE9Rr+4d1/o8Klp/0BMK/ayK2HE7lc8kt6qKb2DA
|
||||
iCWUTqPw7Fq3cQrPia5WWhNP7pJEtFkcAaiR9sW7onW5fBz0uR+dhK0QtmR2xWJj
|
||||
N4fsOp8ZGQ0/eae0rh1CTobucLkM9EwV6VLLlgYL67e4anlUCo8bSEr+WQKBgQCB
|
||||
uf6RgqcY/RqyklPCnYlZ0zyskS9nyXKd1GbK3j+u+swP4LZZlh9f5j88k33LCA2U
|
||||
qLzmMwAB6cWxWqcnELqhqPq9+ClWSmTZKDGk2U936NfAZMirSGRsbsVi9wfTPriP
|
||||
WYlXMSpDjqb0WgsBhNob4npubQxCGKTFOM5Jufy90QKBgB0Lte1jX144uaXx6dtB
|
||||
rjXNuWNir0Jy31wHnQuCA+XnfUgPcrKmRLm8taMbXgZwxkNvgFkpUWU8aPEK08Ne
|
||||
X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
5JiconnI5aLek0QVPoFaVXFa
|
||||
-----END PRIVATE KEY-----
|
||||
";
|
||||
|
||||
#[tokio::test]
|
||||
async fn renew() {
|
||||
let (rs1, jwk1) = new_rsa_jwk("1".into());
|
||||
let (rs2, jwk2) = new_rsa_jwk("2".into());
|
||||
let (rs1, jwk1) = new_rsa_jwk(RS1, "1".into());
|
||||
let (rs2, jwk2) = new_rsa_jwk(RS2, "2".into());
|
||||
let (ec1, jwk3) = new_ec_jwk("3".into());
|
||||
let (ec2, jwk4) = new_ec_jwk("4".into());
|
||||
|
||||
|
||||
@@ -613,40 +613,6 @@ mod tests {
|
||||
tmpdir.close().unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn verify_parquet_min_compression() {
|
||||
let tmpdir = camino_tempfile::tempdir().unwrap();
|
||||
|
||||
let config = ParquetConfig {
|
||||
propeties: Arc::new(
|
||||
WriterProperties::builder()
|
||||
.set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
|
||||
.build(),
|
||||
),
|
||||
rows_per_group: 2_000,
|
||||
file_size: 1_000_000,
|
||||
max_duration: time::Duration::from_secs(20 * 60),
|
||||
test_remote_failures: 0,
|
||||
};
|
||||
|
||||
let rx = random_stream(50_000);
|
||||
let file_stats = run_test(tmpdir.path(), config, rx).await;
|
||||
|
||||
// with compression, there are fewer files with more rows per file
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1223214, 5, 10000),
|
||||
(1229364, 5, 10000),
|
||||
(1231158, 5, 10000),
|
||||
(1230520, 5, 10000),
|
||||
(1221798, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn verify_parquet_strong_compression() {
|
||||
let tmpdir = camino_tempfile::tempdir().unwrap();
|
||||
|
||||
@@ -4,8 +4,8 @@ use lasso::ThreadedRodeo;
|
||||
use measured::{
|
||||
label::{FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet},
|
||||
metric::{histogram::Thresholds, name::MetricName},
|
||||
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
|
||||
LabelGroup, MetricGroup,
|
||||
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
|
||||
MetricGroup,
|
||||
};
|
||||
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
|
||||
|
||||
@@ -548,6 +548,7 @@ pub enum RedisEventsCount {
|
||||
}
|
||||
|
||||
pub struct ThreadPoolWorkers(usize);
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct ThreadPoolWorkerId(pub usize);
|
||||
|
||||
impl LabelValue for ThreadPoolWorkerId {
|
||||
@@ -613,9 +614,6 @@ impl FixedCardinalitySet for ThreadPoolWorkers {
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new(workers: usize))]
|
||||
pub struct ThreadPoolMetrics {
|
||||
pub injector_queue_depth: Gauge,
|
||||
#[metric(init = GaugeVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_queue_depth: GaugeVec<ThreadPoolWorkers>,
|
||||
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
pub worker_task_turns_total: CounterVec<ThreadPoolWorkers>,
|
||||
#[metric(init = CounterVec::with_label_set(ThreadPoolWorkers(workers)))]
|
||||
|
||||
@@ -83,10 +83,10 @@ mod tests {
|
||||
let mut ids = vec![];
|
||||
|
||||
for _ in 0..n {
|
||||
// number of insert operations
|
||||
let n = rng.gen_range(1..100);
|
||||
// number to insert at once
|
||||
let m = rng.gen_range(1..4096);
|
||||
let n = rng.gen_range(1..4096);
|
||||
// number of insert operations
|
||||
let m = rng.gen_range(1..100);
|
||||
|
||||
let id = uuid::Builder::from_random_bytes(rng.gen()).into_uuid();
|
||||
ids.push((id, n, m));
|
||||
@@ -102,17 +102,11 @@ mod tests {
|
||||
let mut ids2 = ids.clone();
|
||||
while !ids2.is_empty() {
|
||||
ids2.shuffle(&mut rng);
|
||||
|
||||
let mut i = 0;
|
||||
while i < ids2.len() {
|
||||
sketch.inc_and_return(&ids2[i].0, ids2[i].1);
|
||||
ids2[i].2 -= 1;
|
||||
if ids2[i].2 == 0 {
|
||||
ids2.remove(i);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
ids2.retain_mut(|id| {
|
||||
sketch.inc_and_return(&id.0, id.1);
|
||||
id.2 -= 1;
|
||||
id.2 > 0
|
||||
});
|
||||
}
|
||||
|
||||
let mut within_p = 0;
|
||||
@@ -144,8 +138,8 @@ mod tests {
|
||||
// probably numbers are too small to truly represent the probabilities.
|
||||
assert_eq!(eval_precision(100, 4096.0, 0.90), 100);
|
||||
assert_eq!(eval_precision(1000, 4096.0, 0.90), 1000);
|
||||
assert_eq!(eval_precision(100, 4096.0, 0.1), 98);
|
||||
assert_eq!(eval_precision(1000, 4096.0, 0.1), 991);
|
||||
assert_eq!(eval_precision(100, 4096.0, 0.1), 96);
|
||||
assert_eq!(eval_precision(1000, 4096.0, 0.1), 988);
|
||||
}
|
||||
|
||||
// returns memory usage in bytes, and the time complexity per insert.
|
||||
|
||||
@@ -75,7 +75,7 @@ mod tests {
|
||||
let salt = b"sodium chloride";
|
||||
let pass = b"Ne0n_!5_50_C007";
|
||||
|
||||
let mut job = Pbkdf2::start(pass, salt, 600000);
|
||||
let mut job = Pbkdf2::start(pass, salt, 60000);
|
||||
let hash = loop {
|
||||
let std::task::Poll::Ready(hash) = job.turn() else {
|
||||
continue;
|
||||
@@ -83,7 +83,7 @@ mod tests {
|
||||
break hash;
|
||||
};
|
||||
|
||||
let expected = pbkdf2_hmac_array::<Sha256, 32>(pass, salt, 600000);
|
||||
let expected = pbkdf2_hmac_array::<Sha256, 32>(pass, salt, 60000);
|
||||
assert_eq!(hash, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,17 +4,19 @@
|
||||
//! 1. Fairness per endpoint.
|
||||
//! 2. Yield support for high iteration counts.
|
||||
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use crossbeam_deque::{Injector, Stealer, Worker};
|
||||
use itertools::Itertools;
|
||||
use parking_lot::{Condvar, Mutex};
|
||||
use rand::Rng;
|
||||
use rand::{rngs::SmallRng, SeedableRng};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::{
|
||||
intern::EndpointIdInt,
|
||||
@@ -25,273 +27,146 @@ use crate::{
|
||||
use super::pbkdf2::Pbkdf2;
|
||||
|
||||
pub struct ThreadPool {
|
||||
queue: Injector<JobSpec>,
|
||||
stealers: Vec<Stealer<JobSpec>>,
|
||||
parkers: Vec<(Condvar, Mutex<ThreadState>)>,
|
||||
/// bitpacked representation.
|
||||
/// lower 8 bits = number of sleeping threads
|
||||
/// next 8 bits = number of idle threads (searching for work)
|
||||
counters: AtomicU64,
|
||||
|
||||
runtime: Option<tokio::runtime::Runtime>,
|
||||
pub metrics: Arc<ThreadPoolMetrics>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq)]
|
||||
enum ThreadState {
|
||||
Parked,
|
||||
Active,
|
||||
/// How often to reset the sketch values
|
||||
const SKETCH_RESET_INTERVAL: u64 = 1021;
|
||||
|
||||
thread_local! {
|
||||
static STATE: RefCell<Option<ThreadRt>> = const { RefCell::new(None) };
|
||||
}
|
||||
|
||||
impl ThreadPool {
|
||||
pub fn new(n_workers: u8) -> Arc<Self> {
|
||||
let workers = (0..n_workers).map(|_| Worker::new_fifo()).collect_vec();
|
||||
let stealers = workers.iter().map(|w| w.stealer()).collect_vec();
|
||||
// rayon would be nice here, but yielding in rayon does not work well afaict.
|
||||
|
||||
let parkers = (0..n_workers)
|
||||
.map(|_| (Condvar::new(), Mutex::new(ThreadState::Active)))
|
||||
.collect_vec();
|
||||
Arc::new_cyclic(|pool| {
|
||||
let pool = pool.clone();
|
||||
let worker_id = AtomicUsize::new(0);
|
||||
|
||||
let pool = Arc::new(Self {
|
||||
queue: Injector::new(),
|
||||
stealers,
|
||||
parkers,
|
||||
// threads start searching for work
|
||||
counters: AtomicU64::new((n_workers as u64) << 8),
|
||||
metrics: Arc::new(ThreadPoolMetrics::new(n_workers as usize)),
|
||||
});
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(n_workers as usize)
|
||||
.on_thread_start(move || {
|
||||
STATE.with_borrow_mut(|state| {
|
||||
*state = Some(ThreadRt {
|
||||
pool: pool.clone(),
|
||||
id: ThreadPoolWorkerId(worker_id.fetch_add(1, Ordering::Relaxed)),
|
||||
rng: SmallRng::from_entropy(),
|
||||
// used to determine whether we should temporarily skip tasks for fairness.
|
||||
// 99% of estimates will overcount by no more than 4096 samples
|
||||
countmin: CountMinSketch::with_params(
|
||||
1.0 / (SKETCH_RESET_INTERVAL as f64),
|
||||
0.01,
|
||||
),
|
||||
tick: 0,
|
||||
});
|
||||
});
|
||||
})
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
for (i, worker) in workers.into_iter().enumerate() {
|
||||
let pool = Arc::clone(&pool);
|
||||
std::thread::spawn(move || thread_rt(pool, worker, i));
|
||||
}
|
||||
|
||||
pool
|
||||
Self {
|
||||
runtime: Some(runtime),
|
||||
metrics: Arc::new(ThreadPoolMetrics::new(n_workers as usize)),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_job(
|
||||
&self,
|
||||
endpoint: EndpointIdInt,
|
||||
pbkdf2: Pbkdf2,
|
||||
) -> oneshot::Receiver<[u8; 32]> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let queue_was_empty = self.queue.is_empty();
|
||||
|
||||
self.metrics.injector_queue_depth.inc();
|
||||
self.queue.push(JobSpec {
|
||||
response: tx,
|
||||
pbkdf2,
|
||||
endpoint,
|
||||
});
|
||||
|
||||
// inspired from <https://github.com/rayon-rs/rayon/blob/3e3962cb8f7b50773bcc360b48a7a674a53a2c77/rayon-core/src/sleep/mod.rs#L242>
|
||||
let counts = self.counters.load(Ordering::SeqCst);
|
||||
let num_awake_but_idle = (counts >> 8) & 0xff;
|
||||
let num_sleepers = counts & 0xff;
|
||||
|
||||
// If the queue is non-empty, then we always wake up a worker
|
||||
// -- clearly the existing idle jobs aren't enough. Otherwise,
|
||||
// check to see if we have enough idle workers.
|
||||
if !queue_was_empty || num_awake_but_idle == 0 {
|
||||
let num_to_wake = Ord::min(1, num_sleepers);
|
||||
self.wake_any_threads(num_to_wake);
|
||||
}
|
||||
|
||||
rx
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn wake_any_threads(&self, mut num_to_wake: u64) {
|
||||
if num_to_wake > 0 {
|
||||
for i in 0..self.parkers.len() {
|
||||
if self.wake_specific_thread(i) {
|
||||
num_to_wake -= 1;
|
||||
if num_to_wake == 0 {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn wake_specific_thread(&self, index: usize) -> bool {
|
||||
let (condvar, lock) = &self.parkers[index];
|
||||
|
||||
let mut state = lock.lock();
|
||||
if *state == ThreadState::Parked {
|
||||
condvar.notify_one();
|
||||
|
||||
// When the thread went to sleep, it will have incremented
|
||||
// this value. When we wake it, its our job to decrement
|
||||
// it. We could have the thread do it, but that would
|
||||
// introduce a delay between when the thread was
|
||||
// *notified* and when this counter was decremented. That
|
||||
// might mislead people with new work into thinking that
|
||||
// there are sleeping threads that they should try to
|
||||
// wake, when in fact there is nothing left for them to
|
||||
// do.
|
||||
self.counters.fetch_sub(1, Ordering::SeqCst);
|
||||
*state = ThreadState::Active;
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn steal(&self, rng: &mut impl Rng, skip: usize, worker: &Worker<JobSpec>) -> Option<JobSpec> {
|
||||
// announce thread as idle
|
||||
self.counters.fetch_add(256, Ordering::SeqCst);
|
||||
|
||||
// try steal from the global queue
|
||||
loop {
|
||||
match self.queue.steal_batch_and_pop(worker) {
|
||||
crossbeam_deque::Steal::Success(job) => {
|
||||
self.metrics
|
||||
.injector_queue_depth
|
||||
.set(self.queue.len() as i64);
|
||||
// no longer idle
|
||||
self.counters.fetch_sub(256, Ordering::SeqCst);
|
||||
return Some(job);
|
||||
}
|
||||
crossbeam_deque::Steal::Retry => continue,
|
||||
crossbeam_deque::Steal::Empty => break,
|
||||
}
|
||||
}
|
||||
|
||||
// try steal from our neighbours
|
||||
loop {
|
||||
let mut retry = false;
|
||||
let start = rng.gen_range(0..self.stealers.len());
|
||||
let job = (start..self.stealers.len())
|
||||
.chain(0..start)
|
||||
.filter(|i| *i != skip)
|
||||
.find_map(
|
||||
|victim| match self.stealers[victim].steal_batch_and_pop(worker) {
|
||||
crossbeam_deque::Steal::Success(job) => Some(job),
|
||||
crossbeam_deque::Steal::Empty => None,
|
||||
crossbeam_deque::Steal::Retry => {
|
||||
retry = true;
|
||||
None
|
||||
}
|
||||
},
|
||||
);
|
||||
if job.is_some() {
|
||||
// no longer idle
|
||||
self.counters.fetch_sub(256, Ordering::SeqCst);
|
||||
return job;
|
||||
}
|
||||
if !retry {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
) -> tokio::task::JoinHandle<[u8; 32]> {
|
||||
self.runtime
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.spawn(JobSpec { pbkdf2, endpoint })
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_rt(pool: Arc<ThreadPool>, worker: Worker<JobSpec>, index: usize) {
|
||||
/// interval when we should steal from the global queue
|
||||
/// so that tail latencies are managed appropriately
|
||||
const STEAL_INTERVAL: usize = 61;
|
||||
impl Drop for ThreadPool {
|
||||
fn drop(&mut self) {
|
||||
self.runtime.take().unwrap().shutdown_background();
|
||||
}
|
||||
}
|
||||
|
||||
/// How often to reset the sketch values
|
||||
const SKETCH_RESET_INTERVAL: usize = 1021;
|
||||
struct ThreadRt {
|
||||
pool: Weak<ThreadPool>,
|
||||
id: ThreadPoolWorkerId,
|
||||
rng: SmallRng,
|
||||
countmin: CountMinSketch,
|
||||
tick: u64,
|
||||
}
|
||||
|
||||
let mut rng = SmallRng::from_entropy();
|
||||
impl ThreadRt {
|
||||
fn should_run(&mut self, job: &JobSpec) -> bool {
|
||||
let rate = self
|
||||
.countmin
|
||||
.inc_and_return(&job.endpoint, job.pbkdf2.cost());
|
||||
|
||||
// used to determine whether we should temporarily skip tasks for fairness.
|
||||
// 99% of estimates will overcount by no more than 4096 samples
|
||||
let mut sketch = CountMinSketch::with_params(1.0 / (SKETCH_RESET_INTERVAL as f64), 0.01);
|
||||
|
||||
let (condvar, lock) = &pool.parkers[index];
|
||||
|
||||
'wait: loop {
|
||||
// wait for notification of work
|
||||
{
|
||||
let mut lock = lock.lock();
|
||||
|
||||
// queue is empty
|
||||
pool.metrics
|
||||
.worker_queue_depth
|
||||
.set(ThreadPoolWorkerId(index), 0);
|
||||
|
||||
// subtract 1 from idle count, add 1 to sleeping count.
|
||||
pool.counters.fetch_sub(255, Ordering::SeqCst);
|
||||
|
||||
*lock = ThreadState::Parked;
|
||||
condvar.wait(&mut lock);
|
||||
}
|
||||
|
||||
for i in 0.. {
|
||||
let Some(mut job) = worker
|
||||
.pop()
|
||||
.or_else(|| pool.steal(&mut rng, index, &worker))
|
||||
else {
|
||||
continue 'wait;
|
||||
};
|
||||
|
||||
pool.metrics
|
||||
.worker_queue_depth
|
||||
.set(ThreadPoolWorkerId(index), worker.len() as i64);
|
||||
|
||||
// receiver is closed, cancel the task
|
||||
if !job.response.is_closed() {
|
||||
let rate = sketch.inc_and_return(&job.endpoint, job.pbkdf2.cost());
|
||||
|
||||
const P: f64 = 2000.0;
|
||||
// probability decreases as rate increases.
|
||||
// lower probability, higher chance of being skipped
|
||||
//
|
||||
// estimates (rate in terms of 4096 rounds):
|
||||
// rate = 0 => probability = 100%
|
||||
// rate = 10 => probability = 71.3%
|
||||
// rate = 50 => probability = 62.1%
|
||||
// rate = 500 => probability = 52.3%
|
||||
// rate = 1021 => probability = 49.8%
|
||||
//
|
||||
// My expectation is that the pool queue will only begin backing up at ~1000rps
|
||||
// in which case the SKETCH_RESET_INTERVAL represents 1 second. Thus, the rates above
|
||||
// are in requests per second.
|
||||
let probability = P.ln() / (P + rate as f64).ln();
|
||||
if pool.queue.len() > 32 || rng.gen_bool(probability) {
|
||||
pool.metrics
|
||||
.worker_task_turns_total
|
||||
.inc(ThreadPoolWorkerId(index));
|
||||
|
||||
match job.pbkdf2.turn() {
|
||||
std::task::Poll::Ready(result) => {
|
||||
let _ = job.response.send(result);
|
||||
}
|
||||
std::task::Poll::Pending => worker.push(job),
|
||||
}
|
||||
} else {
|
||||
pool.metrics
|
||||
.worker_task_skips_total
|
||||
.inc(ThreadPoolWorkerId(index));
|
||||
|
||||
// skip for now
|
||||
worker.push(job);
|
||||
}
|
||||
}
|
||||
|
||||
// if we get stuck with a few long lived jobs in the queue
|
||||
// it's better to try and steal from the queue too for fairness
|
||||
if i % STEAL_INTERVAL == 0 {
|
||||
let _ = pool.queue.steal_batch(&worker);
|
||||
}
|
||||
|
||||
if i % SKETCH_RESET_INTERVAL == 0 {
|
||||
sketch.reset();
|
||||
}
|
||||
}
|
||||
const P: f64 = 2000.0;
|
||||
// probability decreases as rate increases.
|
||||
// lower probability, higher chance of being skipped
|
||||
//
|
||||
// estimates (rate in terms of 4096 rounds):
|
||||
// rate = 0 => probability = 100%
|
||||
// rate = 10 => probability = 71.3%
|
||||
// rate = 50 => probability = 62.1%
|
||||
// rate = 500 => probability = 52.3%
|
||||
// rate = 1021 => probability = 49.8%
|
||||
//
|
||||
// My expectation is that the pool queue will only begin backing up at ~1000rps
|
||||
// in which case the SKETCH_RESET_INTERVAL represents 1 second. Thus, the rates above
|
||||
// are in requests per second.
|
||||
let probability = P.ln() / (P + rate as f64).ln();
|
||||
self.rng.gen_bool(probability)
|
||||
}
|
||||
}
|
||||
|
||||
struct JobSpec {
|
||||
response: oneshot::Sender<[u8; 32]>,
|
||||
pbkdf2: Pbkdf2,
|
||||
endpoint: EndpointIdInt,
|
||||
}
|
||||
|
||||
impl Future for JobSpec {
|
||||
type Output = [u8; 32];
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
STATE.with_borrow_mut(|state| {
|
||||
let state = state.as_mut().expect("should be set on thread startup");
|
||||
|
||||
state.tick = state.tick.wrapping_add(1);
|
||||
if state.tick % SKETCH_RESET_INTERVAL == 0 {
|
||||
state.countmin.reset();
|
||||
}
|
||||
|
||||
if state.should_run(&self) {
|
||||
if let Some(pool) = state.pool.upgrade() {
|
||||
pool.metrics.worker_task_turns_total.inc(state.id);
|
||||
}
|
||||
|
||||
match self.pbkdf2.turn() {
|
||||
Poll::Ready(result) => Poll::Ready(result),
|
||||
// more to do, we shall requeue
|
||||
Poll::Pending => {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if let Some(pool) = state.pool.upgrade() {
|
||||
pool.metrics.worker_task_skips_total.inc(state.id);
|
||||
}
|
||||
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::EndpointId;
|
||||
|
||||
Reference in New Issue
Block a user