chore: replace dashmap with clashmap (#10582)

## Problem

Because dashmap 6 switched to hashbrown RawTable API, it required us to
use unsafe code in the upgrade:
https://github.com/neondatabase/neon/pull/8107

## Summary of changes

Switch to clashmap, a fork maintained by me which removes much of the
unsafe and ultimately switches to HashTable instead of RawTable to
remove much of the unsafe requirement on us.
This commit is contained in:
Conrad Ludgate
2025-01-31 09:53:43 +00:00
committed by GitHub
parent 423e239617
commit 738bf83583
14 changed files with 97 additions and 48 deletions

53
Cargo.lock generated
View File

@@ -1213,6 +1213,20 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7"
[[package]]
name = "clashmap"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93bd59c81e2bd87a775ae2de75f070f7e2bfe97363a6ad652f46824564c23e4d"
dependencies = [
"crossbeam-utils",
"hashbrown 0.15.2",
"lock_api",
"parking_lot_core 0.9.8",
"polonius-the-crab",
"replace_with",
]
[[package]]
name = "colorchoice"
version = "1.0.0"
@@ -2531,6 +2545,12 @@ dependencies = [
"allocator-api2",
]
[[package]]
name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
[[package]]
name = "hashlink"
version = "0.9.1"
@@ -2581,6 +2601,15 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46"
[[package]]
name = "higher-kinded-types"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "561985554c8b8d4808605c90a5f1979cc6c31a5d20b78465cd59501233c6678e"
dependencies = [
"never-say-never",
]
[[package]]
name = "hmac"
version = "0.12.1"
@@ -3544,6 +3573,12 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "never-say-never"
version = "6.6.666"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6"
[[package]]
name = "nix"
version = "0.25.1"
@@ -4421,6 +4456,16 @@ dependencies = [
"plotters-backend",
]
[[package]]
name = "polonius-the-crab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97ca2c89572ae41bbec1c99498251f87dd5a94e500c5ec19c382dd593dd5ce9"
dependencies = [
"higher-kinded-types",
"never-say-never",
]
[[package]]
name = "postgres"
version = "0.19.6"
@@ -4794,9 +4839,9 @@ dependencies = [
"camino-tempfile",
"chrono",
"clap",
"clashmap",
"compute_api",
"consumption_metrics",
"dashmap 5.5.0",
"ecdsa 0.16.9",
"ed25519-dalek",
"env_logger 0.10.2",
@@ -5215,6 +5260,12 @@ dependencies = [
"utils",
]
[[package]]
name = "replace_with"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3a8614ee435691de62bcffcf4a66d91b3594bf1428a5722e79103249a095690"
[[package]]
name = "reqwest"
version = "0.12.4"

View File

@@ -77,10 +77,10 @@ camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive", "env"] }
clashmap = { version = "1.0", features = ["raw-api"] }
comfy-table = "7.1"
const_format = "0.2"
crc32c = "0.6"
dashmap = { version = "5.5.0", features = ["raw-api"] }
diatomic-waker = { version = "0.2.3" }
either = "1.8"
enum-map = "2.4.2"

View File

@@ -24,9 +24,9 @@ bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
clashmap.workspace = true
compute_api.workspace = true
consumption_metrics.workspace = true
dashmap.workspace = true
env_logger.workspace = true
framed-websockets.workspace = true
futures.workspace = true

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use clashmap::ClashMap;
use jose_jwk::crypto::KeyInfo;
use reqwest::{redirect, Client};
use reqwest_retry::policies::ExponentialBackoff;
@@ -64,7 +64,7 @@ pub(crate) struct AuthRule {
pub struct JwkCache {
client: reqwest_middleware::ClientWithMiddleware,
map: DashMap<(EndpointId, RoleName), Arc<JwkCacheEntryLock>>,
map: ClashMap<(EndpointId, RoleName), Arc<JwkCacheEntryLock>>,
}
pub(crate) struct JwkCacheEntry {
@@ -469,7 +469,7 @@ impl Default for JwkCache {
JwkCache {
client,
map: DashMap::default(),
map: ClashMap::default(),
}
}
}

View File

@@ -3,7 +3,7 @@ use std::future::pending;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use dashmap::DashSet;
use clashmap::ClashSet;
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::{AsyncCommands, FromRedisValue, Value};
use serde::Deserialize;
@@ -55,9 +55,9 @@ impl TryFrom<&Value> for ControlPlaneEvent {
pub struct EndpointsCache {
config: EndpointCacheConfig,
endpoints: DashSet<EndpointIdInt>,
branches: DashSet<BranchIdInt>,
projects: DashSet<ProjectIdInt>,
endpoints: ClashSet<EndpointIdInt>,
branches: ClashSet<BranchIdInt>,
projects: ClashSet<ProjectIdInt>,
ready: AtomicBool,
limiter: Arc<Mutex<GlobalRateLimiter>>,
}
@@ -69,9 +69,9 @@ impl EndpointsCache {
config.limiter_info.clone(),
))),
config,
endpoints: DashSet::new(),
branches: DashSet::new(),
projects: DashSet::new(),
endpoints: ClashSet::new(),
branches: ClashSet::new(),
projects: ClashSet::new(),
ready: AtomicBool::new(false),
}
}

View File

@@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use dashmap::DashMap;
use clashmap::ClashMap;
use rand::{thread_rng, Rng};
use smol_str::SmolStr;
use tokio::sync::Mutex;
@@ -108,9 +108,9 @@ impl EndpointInfo {
/// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
/// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
pub struct ProjectInfoCacheImpl {
cache: DashMap<EndpointIdInt, EndpointInfo>,
cache: ClashMap<EndpointIdInt, EndpointInfo>,
project2ep: DashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
project2ep: ClashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
config: ProjectInfoCacheOptions,
start_time: Instant,
@@ -176,8 +176,8 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
impl ProjectInfoCacheImpl {
pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
Self {
cache: DashMap::new(),
project2ep: DashMap::new(),
cache: ClashMap::new(),
project2ep: ClashMap::new(),
config,
ttl_disabled_since_us: AtomicU64::new(u64::MAX),
start_time: Instant::now(),
@@ -302,7 +302,7 @@ impl ProjectInfoCacheImpl {
let mut removed = 0;
let shard = self.project2ep.shards()[shard].write();
for (_, endpoints) in shard.iter() {
for endpoint in endpoints.get() {
for endpoint in endpoints {
self.cache.remove(endpoint);
removed += 1;
}

View File

@@ -1,3 +1,4 @@
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
@@ -8,7 +9,7 @@ use pq_proto::CancelKeyData;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info};
use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo};
@@ -17,14 +18,11 @@ use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::CancelChannelSizeGuard;
use crate::metrics::{CancellationRequest, Metrics, RedisMsgKind};
use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind};
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::keys::KeyPrefix;
use crate::redis::kv_ops::RedisKVClient;
use crate::tls::postgres_rustls::MakeRustlsConnect;
use std::convert::Infallible;
use tokio::sync::oneshot;
type IpSubnetKey = IpNet;

View File

@@ -6,7 +6,7 @@ use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use clashmap::ClashMap;
use tokio::time::Instant;
use tracing::{debug, info};
@@ -148,7 +148,7 @@ impl ApiCaches {
/// Various caches for [`control_plane`](super).
pub struct ApiLocks<K> {
name: &'static str,
node_locks: DashMap<K, Arc<DynamicLimiter>>,
node_locks: ClashMap<K, Arc<DynamicLimiter>>,
config: RateLimiterConfig,
timeout: Duration,
epoch: std::time::Duration,
@@ -180,7 +180,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
) -> prometheus::Result<Self> {
Ok(Self {
name,
node_locks: DashMap::with_shard_amount(shards),
node_locks: ClashMap::with_shard_amount(shards),
config,
timeout,
epoch,
@@ -238,7 +238,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
let mut lock = shard.write();
let timer = self.metrics.reclamation_lag_seconds.start_timer();
let count = lock
.extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1)
.extract_if(|(_, semaphore)| Arc::strong_count(semaphore) == 1)
.count();
drop(lock);
self.metrics.semaphores_unregistered.inc_by(count as u64);

View File

@@ -2,7 +2,7 @@ use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use ahash::RandomState;
use dashmap::DashMap;
use clashmap::ClashMap;
use rand::{thread_rng, Rng};
use tokio::time::Instant;
use tracing::info;
@@ -14,7 +14,7 @@ use crate::intern::EndpointIdInt;
pub type EndpointRateLimiter = LeakyBucketRateLimiter<EndpointIdInt>;
pub struct LeakyBucketRateLimiter<Key> {
map: DashMap<Key, LeakyBucketState, RandomState>,
map: ClashMap<Key, LeakyBucketState, RandomState>,
config: utils::leaky_bucket::LeakyBucketConfig,
access_count: AtomicUsize,
}
@@ -27,7 +27,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
pub fn new_with_shards(config: LeakyBucketConfig, shards: usize) -> Self {
Self {
map: DashMap::with_hasher_and_shard_amount(RandomState::new(), shards),
map: ClashMap::with_hasher_and_shard_amount(RandomState::new(), shards),
config: config.into(),
access_count: AtomicUsize::new(0),
}
@@ -58,7 +58,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
let shard = thread_rng().gen_range(0..n);
self.map.shards()[shard]
.write()
.retain(|_, value| !value.get().bucket_is_empty(now));
.retain(|(_, value)| !value.bucket_is_empty(now));
}
}

View File

@@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use anyhow::bail;
use dashmap::DashMap;
use clashmap::ClashMap;
use itertools::Itertools;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
@@ -62,7 +62,7 @@ impl GlobalRateLimiter {
pub type WakeComputeRateLimiter = BucketRateLimiter<EndpointIdInt, StdRng, RandomState>;
pub struct BucketRateLimiter<Key, Rand = StdRng, Hasher = RandomState> {
map: DashMap<Key, Vec<RateBucket>, Hasher>,
map: ClashMap<Key, Vec<RateBucket>, Hasher>,
info: Cow<'static, [RateBucketInfo]>,
access_count: AtomicUsize,
rand: Mutex<Rand>,
@@ -202,7 +202,7 @@ impl<K: Hash + Eq, R: Rng, S: BuildHasher + Clone> BucketRateLimiter<K, R, S> {
info!(buckets = ?info, "endpoint rate limiter");
Self {
info,
map: DashMap::with_hasher_and_shard_amount(hasher, 64),
map: ClashMap::with_hasher_and_shard_amount(hasher, 64),
access_count: AtomicUsize::new(1), // start from 1 to avoid GC on the first request
rand: Mutex::new(rand),
}

View File

@@ -1,7 +1,8 @@
use std::io::ErrorKind;
use anyhow::Ok;
use pq_proto::{id_to_cancel_key, CancelKeyData};
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;
pub mod keyspace {
pub const CANCEL_PREFIX: &str = "cancel";

View File

@@ -1,7 +1,6 @@
use redis::{AsyncCommands, ToRedisArgs};
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
pub struct RedisKVClient {

View File

@@ -5,7 +5,7 @@ use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
use std::time::Duration;
use dashmap::DashMap;
use clashmap::ClashMap;
use parking_lot::RwLock;
use postgres_client::ReadyForQueryStatus;
use rand::Rng;
@@ -351,11 +351,11 @@ where
//
// That should be a fairly conteded map, so return reference to the per-endpoint
// pool as early as possible and release the lock.
pub(crate) global_pool: DashMap<EndpointCacheKey, Arc<RwLock<P>>>,
pub(crate) global_pool: ClashMap<EndpointCacheKey, Arc<RwLock<P>>>,
/// Number of endpoint-connection pools
///
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
/// [`ClashMap::len`] iterates over all inner pools and acquires a read lock on each.
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
/// It's only used for diagnostics.
pub(crate) global_pool_size: AtomicUsize,
@@ -396,7 +396,7 @@ where
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
Arc::new(Self {
global_pool: DashMap::with_shard_amount(shards),
global_pool: ClashMap::with_shard_amount(shards),
global_pool_size: AtomicUsize::new(0),
config,
global_connections_count: Arc::new(AtomicUsize::new(0)),
@@ -442,10 +442,10 @@ where
.start_timer();
let current_len = shard.len();
let mut clients_removed = 0;
shard.retain(|endpoint, x| {
shard.retain(|(endpoint, x)| {
// if the current endpoint pool is unique (no other strong or weak references)
// then it is currently not in use by any connections.
if let Some(pool) = Arc::get_mut(x.get_mut()) {
if let Some(pool) = Arc::get_mut(x) {
let endpoints = pool.get_mut();
clients_removed = endpoints.clear_closed();

View File

@@ -10,9 +10,9 @@ use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use bytes::Bytes;
use chrono::{DateTime, Datelike, Timelike, Utc};
use clashmap::mapref::entry::Entry;
use clashmap::ClashMap;
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use once_cell::sync::Lazy;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use serde::{Deserialize, Serialize};
@@ -137,7 +137,7 @@ type FastHasher = std::hash::BuildHasherDefault<rustc_hash::FxHasher>;
#[derive(Default)]
pub(crate) struct Metrics {
endpoints: DashMap<Ids, Arc<MetricCounter>, FastHasher>,
endpoints: ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
}
impl Metrics {
@@ -213,7 +213,7 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result<Infall
}
fn collect_and_clear_metrics<C: Clearable>(
endpoints: &DashMap<Ids, Arc<C>, FastHasher>,
endpoints: &ClashMap<Ids, Arc<C>, FastHasher>,
) -> Vec<(Ids, u64)> {
let mut metrics_to_clear = Vec::new();
@@ -271,7 +271,7 @@ fn create_event_chunks<'a>(
#[expect(clippy::too_many_arguments)]
#[instrument(skip_all)]
async fn collect_metrics_iteration(
endpoints: &DashMap<Ids, Arc<MetricCounter>, FastHasher>,
endpoints: &ClashMap<Ids, Arc<MetricCounter>, FastHasher>,
client: &http::ClientWithMiddleware,
metric_collection_endpoint: &reqwest::Url,
storage: Option<&GenericRemoteStorage>,