From 738bf835836de94e8aa41b8575c6db78cb882c38 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 31 Jan 2025 09:53:43 +0000 Subject: [PATCH] chore: replace dashmap with clashmap (#10582) ## Problem Because dashmap 6 switched to hashbrown RawTable API, it required us to use unsafe code in the upgrade: https://github.com/neondatabase/neon/pull/8107 ## Summary of changes Switch to clashmap, a fork maintained by me which removes much of the unsafe and ultimately switches to HashTable instead of RawTable to remove much of the unsafe requirement on us. --- Cargo.lock | 53 +++++++++++++++++++++++++- Cargo.toml | 2 +- proxy/Cargo.toml | 2 +- proxy/src/auth/backend/jwt.rs | 6 +-- proxy/src/cache/endpoints.rs | 14 +++---- proxy/src/cache/project_info.rs | 12 +++--- proxy/src/cancellation.rs | 8 ++-- proxy/src/control_plane/client/mod.rs | 8 ++-- proxy/src/rate_limiter/leaky_bucket.rs | 8 ++-- proxy/src/rate_limiter/limiter.rs | 6 +-- proxy/src/redis/keys.rs | 3 +- proxy/src/redis/kv_ops.rs | 1 - proxy/src/serverless/conn_pool_lib.rs | 12 +++--- proxy/src/usage_metrics.rs | 10 ++--- 14 files changed, 97 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 359f989a76..e9cbebcd02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1213,6 +1213,20 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" +[[package]] +name = "clashmap" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93bd59c81e2bd87a775ae2de75f070f7e2bfe97363a6ad652f46824564c23e4d" +dependencies = [ + "crossbeam-utils", + "hashbrown 0.15.2", + "lock_api", + "parking_lot_core 0.9.8", + "polonius-the-crab", + "replace_with", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -2531,6 +2545,12 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" + [[package]] name = "hashlink" version = "0.9.1" @@ -2581,6 +2601,15 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" +[[package]] +name = "higher-kinded-types" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "561985554c8b8d4808605c90a5f1979cc6c31a5d20b78465cd59501233c6678e" +dependencies = [ + "never-say-never", +] + [[package]] name = "hmac" version = "0.12.1" @@ -3544,6 +3573,12 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "never-say-never" +version = "6.6.666" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6" + [[package]] name = "nix" version = "0.25.1" @@ -4421,6 +4456,16 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polonius-the-crab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97ca2c89572ae41bbec1c99498251f87dd5a94e500c5ec19c382dd593dd5ce9" +dependencies = [ + "higher-kinded-types", + "never-say-never", +] + [[package]] name = "postgres" version = "0.19.6" @@ -4794,9 +4839,9 @@ dependencies = [ "camino-tempfile", "chrono", "clap", + "clashmap", "compute_api", "consumption_metrics", - "dashmap 5.5.0", "ecdsa 0.16.9", "ed25519-dalek", "env_logger 0.10.2", @@ -5215,6 +5260,12 @@ dependencies = [ "utils", ] +[[package]] +name = "replace_with" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a8614ee435691de62bcffcf4a66d91b3594bf1428a5722e79103249a095690" + [[package]] name = "reqwest" version = "0.12.4" diff --git a/Cargo.toml b/Cargo.toml index 9ccdb45f6d..9d15b78a93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,10 +77,10 @@ camino = "1.1.6" cfg-if = "1.0.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } clap = { version = "4.0", features = ["derive", "env"] } +clashmap = { version = "1.0", features = ["raw-api"] } comfy-table = "7.1" const_format = "0.2" crc32c = "0.6" -dashmap = { version = "5.5.0", features = ["raw-api"] } diatomic-waker = { version = "0.2.3" } either = "1.8" enum-map = "2.4.2" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index f362a45035..35574e945c 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -24,9 +24,9 @@ bytes = { workspace = true, features = ["serde"] } camino.workspace = true chrono.workspace = true clap = { workspace = true, features = ["derive", "env"] } +clashmap.workspace = true compute_api.workspace = true consumption_metrics.workspace = true -dashmap.workspace = true env_logger.workspace = true framed-websockets.workspace = true futures.workspace = true diff --git a/proxy/src/auth/backend/jwt.rs b/proxy/src/auth/backend/jwt.rs index df716f8455..e05a693cee 100644 --- a/proxy/src/auth/backend/jwt.rs +++ b/proxy/src/auth/backend/jwt.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use arc_swap::ArcSwapOption; -use dashmap::DashMap; +use clashmap::ClashMap; use jose_jwk::crypto::KeyInfo; use reqwest::{redirect, Client}; use reqwest_retry::policies::ExponentialBackoff; @@ -64,7 +64,7 @@ pub(crate) struct AuthRule { pub struct JwkCache { client: reqwest_middleware::ClientWithMiddleware, - map: DashMap<(EndpointId, RoleName), Arc>, + map: ClashMap<(EndpointId, RoleName), Arc>, } pub(crate) struct JwkCacheEntry { @@ -469,7 +469,7 @@ impl Default for JwkCache { JwkCache { client, - map: DashMap::default(), + map: ClashMap::default(), } } } diff --git a/proxy/src/cache/endpoints.rs b/proxy/src/cache/endpoints.rs index 0136446d6d..b5c42cd23d 100644 --- a/proxy/src/cache/endpoints.rs +++ b/proxy/src/cache/endpoints.rs @@ -3,7 +3,7 @@ use std::future::pending; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use dashmap::DashSet; +use clashmap::ClashSet; use redis::streams::{StreamReadOptions, StreamReadReply}; use redis::{AsyncCommands, FromRedisValue, Value}; use serde::Deserialize; @@ -55,9 +55,9 @@ impl TryFrom<&Value> for ControlPlaneEvent { pub struct EndpointsCache { config: EndpointCacheConfig, - endpoints: DashSet, - branches: DashSet, - projects: DashSet, + endpoints: ClashSet, + branches: ClashSet, + projects: ClashSet, ready: AtomicBool, limiter: Arc>, } @@ -69,9 +69,9 @@ impl EndpointsCache { config.limiter_info.clone(), ))), config, - endpoints: DashSet::new(), - branches: DashSet::new(), - projects: DashSet::new(), + endpoints: ClashSet::new(), + branches: ClashSet::new(), + projects: ClashSet::new(), ready: AtomicBool::new(false), } } diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index cab0b8b905..a5e71f1a87 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use dashmap::DashMap; +use clashmap::ClashMap; use rand::{thread_rng, Rng}; use smol_str::SmolStr; use tokio::sync::Mutex; @@ -108,9 +108,9 @@ impl EndpointInfo { /// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available? /// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache. pub struct ProjectInfoCacheImpl { - cache: DashMap, + cache: ClashMap, - project2ep: DashMap>, + project2ep: ClashMap>, config: ProjectInfoCacheOptions, start_time: Instant, @@ -176,8 +176,8 @@ impl ProjectInfoCache for ProjectInfoCacheImpl { impl ProjectInfoCacheImpl { pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self { Self { - cache: DashMap::new(), - project2ep: DashMap::new(), + cache: ClashMap::new(), + project2ep: ClashMap::new(), config, ttl_disabled_since_us: AtomicU64::new(u64::MAX), start_time: Instant::now(), @@ -302,7 +302,7 @@ impl ProjectInfoCacheImpl { let mut removed = 0; let shard = self.project2ep.shards()[shard].write(); for (_, endpoints) in shard.iter() { - for endpoint in endpoints.get() { + for endpoint in endpoints { self.cache.remove(endpoint); removed += 1; } diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 34f708a36b..9a0b954341 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,3 +1,4 @@ +use std::convert::Infallible; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; @@ -8,7 +9,7 @@ use pq_proto::CancelKeyData; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::net::TcpStream; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, info}; use crate::auth::backend::{BackendIpAllowlist, ComputeUserInfo}; @@ -17,14 +18,11 @@ use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::error::ReportableError; use crate::ext::LockExt; -use crate::metrics::CancelChannelSizeGuard; -use crate::metrics::{CancellationRequest, Metrics, RedisMsgKind}; +use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind}; use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::keys::KeyPrefix; use crate::redis::kv_ops::RedisKVClient; use crate::tls::postgres_rustls::MakeRustlsConnect; -use std::convert::Infallible; -use tokio::sync::oneshot; type IpSubnetKey = IpNet; diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index d559d96bbc..b879f3a59f 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -6,7 +6,7 @@ use std::hash::Hash; use std::sync::Arc; use std::time::Duration; -use dashmap::DashMap; +use clashmap::ClashMap; use tokio::time::Instant; use tracing::{debug, info}; @@ -148,7 +148,7 @@ impl ApiCaches { /// Various caches for [`control_plane`](super). pub struct ApiLocks { name: &'static str, - node_locks: DashMap>, + node_locks: ClashMap>, config: RateLimiterConfig, timeout: Duration, epoch: std::time::Duration, @@ -180,7 +180,7 @@ impl ApiLocks { ) -> prometheus::Result { Ok(Self { name, - node_locks: DashMap::with_shard_amount(shards), + node_locks: ClashMap::with_shard_amount(shards), config, timeout, epoch, @@ -238,7 +238,7 @@ impl ApiLocks { let mut lock = shard.write(); let timer = self.metrics.reclamation_lag_seconds.start_timer(); let count = lock - .extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1) + .extract_if(|(_, semaphore)| Arc::strong_count(semaphore) == 1) .count(); drop(lock); self.metrics.semaphores_unregistered.inc_by(count as u64); diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index bff800f0a2..9645eaf725 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -2,7 +2,7 @@ use std::hash::Hash; use std::sync::atomic::{AtomicUsize, Ordering}; use ahash::RandomState; -use dashmap::DashMap; +use clashmap::ClashMap; use rand::{thread_rng, Rng}; use tokio::time::Instant; use tracing::info; @@ -14,7 +14,7 @@ use crate::intern::EndpointIdInt; pub type EndpointRateLimiter = LeakyBucketRateLimiter; pub struct LeakyBucketRateLimiter { - map: DashMap, + map: ClashMap, config: utils::leaky_bucket::LeakyBucketConfig, access_count: AtomicUsize, } @@ -27,7 +27,7 @@ impl LeakyBucketRateLimiter { pub fn new_with_shards(config: LeakyBucketConfig, shards: usize) -> Self { Self { - map: DashMap::with_hasher_and_shard_amount(RandomState::new(), shards), + map: ClashMap::with_hasher_and_shard_amount(RandomState::new(), shards), config: config.into(), access_count: AtomicUsize::new(0), } @@ -58,7 +58,7 @@ impl LeakyBucketRateLimiter { let shard = thread_rng().gen_range(0..n); self.map.shards()[shard] .write() - .retain(|_, value| !value.get().bucket_is_empty(now)); + .retain(|(_, value)| !value.bucket_is_empty(now)); } } diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index ec080f270b..ef6c39f230 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; use anyhow::bail; -use dashmap::DashMap; +use clashmap::ClashMap; use itertools::Itertools; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -62,7 +62,7 @@ impl GlobalRateLimiter { pub type WakeComputeRateLimiter = BucketRateLimiter; pub struct BucketRateLimiter { - map: DashMap, Hasher>, + map: ClashMap, Hasher>, info: Cow<'static, [RateBucketInfo]>, access_count: AtomicUsize, rand: Mutex, @@ -202,7 +202,7 @@ impl BucketRateLimiter { info!(buckets = ?info, "endpoint rate limiter"); Self { info, - map: DashMap::with_hasher_and_shard_amount(hasher, 64), + map: ClashMap::with_hasher_and_shard_amount(hasher, 64), access_count: AtomicUsize::new(1), // start from 1 to avoid GC on the first request rand: Mutex::new(rand), } diff --git a/proxy/src/redis/keys.rs b/proxy/src/redis/keys.rs index dddc7e2054..dcb9a59f87 100644 --- a/proxy/src/redis/keys.rs +++ b/proxy/src/redis/keys.rs @@ -1,7 +1,8 @@ +use std::io::ErrorKind; + use anyhow::Ok; use pq_proto::{id_to_cancel_key, CancelKeyData}; use serde::{Deserialize, Serialize}; -use std::io::ErrorKind; pub mod keyspace { pub const CANCEL_PREFIX: &str = "cancel"; diff --git a/proxy/src/redis/kv_ops.rs b/proxy/src/redis/kv_ops.rs index dcc6aac51b..3689bf7ae2 100644 --- a/proxy/src/redis/kv_ops.rs +++ b/proxy/src/redis/kv_ops.rs @@ -1,7 +1,6 @@ use redis::{AsyncCommands, ToRedisArgs}; use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider; - use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo}; pub struct RedisKVClient { diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 44eac77e8f..a300198de4 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -5,7 +5,7 @@ use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Weak}; use std::time::Duration; -use dashmap::DashMap; +use clashmap::ClashMap; use parking_lot::RwLock; use postgres_client::ReadyForQueryStatus; use rand::Rng; @@ -351,11 +351,11 @@ where // // That should be a fairly conteded map, so return reference to the per-endpoint // pool as early as possible and release the lock. - pub(crate) global_pool: DashMap>>, + pub(crate) global_pool: ClashMap>>, /// Number of endpoint-connection pools /// - /// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each. + /// [`ClashMap::len`] iterates over all inner pools and acquires a read lock on each. /// That seems like far too much effort, so we're using a relaxed increment counter instead. /// It's only used for diagnostics. pub(crate) global_pool_size: AtomicUsize, @@ -396,7 +396,7 @@ where pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc { let shards = config.pool_options.pool_shards; Arc::new(Self { - global_pool: DashMap::with_shard_amount(shards), + global_pool: ClashMap::with_shard_amount(shards), global_pool_size: AtomicUsize::new(0), config, global_connections_count: Arc::new(AtomicUsize::new(0)), @@ -442,10 +442,10 @@ where .start_timer(); let current_len = shard.len(); let mut clients_removed = 0; - shard.retain(|endpoint, x| { + shard.retain(|(endpoint, x)| { // if the current endpoint pool is unique (no other strong or weak references) // then it is currently not in use by any connections. - if let Some(pool) = Arc::get_mut(x.get_mut()) { + if let Some(pool) = Arc::get_mut(x) { let endpoints = pool.get_mut(); clients_removed = endpoints.clear_closed(); diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index e1cc7e87b4..d369e3742f 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -10,9 +10,9 @@ use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; use chrono::{DateTime, Datelike, Timelike, Utc}; +use clashmap::mapref::entry::Entry; +use clashmap::ClashMap; use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; -use dashmap::mapref::entry::Entry; -use dashmap::DashMap; use once_cell::sync::Lazy; use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use serde::{Deserialize, Serialize}; @@ -137,7 +137,7 @@ type FastHasher = std::hash::BuildHasherDefault; #[derive(Default)] pub(crate) struct Metrics { - endpoints: DashMap, FastHasher>, + endpoints: ClashMap, FastHasher>, } impl Metrics { @@ -213,7 +213,7 @@ pub async fn task_main(config: &MetricCollectionConfig) -> anyhow::Result( - endpoints: &DashMap, FastHasher>, + endpoints: &ClashMap, FastHasher>, ) -> Vec<(Ids, u64)> { let mut metrics_to_clear = Vec::new(); @@ -271,7 +271,7 @@ fn create_event_chunks<'a>( #[expect(clippy::too_many_arguments)] #[instrument(skip_all)] async fn collect_metrics_iteration( - endpoints: &DashMap, FastHasher>, + endpoints: &ClashMap, FastHasher>, client: &http::ClientWithMiddleware, metric_collection_endpoint: &reqwest::Url, storage: Option<&GenericRemoteStorage>,