diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 8d16f202e9..a5eb3544b4 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,5 +1,5 @@ -use anyhow::{anyhow, Context}; -use hashbrown::HashMap; +use anyhow::{bail, Context}; +use dashmap::DashMap; use pq_proto::CancelKeyData; use std::net::SocketAddr; use tokio::net::TcpStream; @@ -8,7 +8,7 @@ use tracing::info; /// Enables serving `CancelRequest`s. #[derive(Default)] -pub struct CancelMap(parking_lot::RwLock>>); +pub struct CancelMap(DashMap>); impl CancelMap { /// Cancel a running query for the corresponding connection. @@ -16,7 +16,6 @@ impl CancelMap { // NB: we should immediately release the lock after cloning the token. let cancel_closure = self .0 - .read() .get(&key) .and_then(|x| x.clone()) .with_context(|| format!("query cancellation key not found: {key}"))?; @@ -40,15 +39,19 @@ impl CancelMap { // Random key collisions are unlikely to happen here, but they're still possible, // which is why we have to take care not to rewrite an existing key. - self.0 - .write() - .try_insert(key, None) - .map_err(|_| anyhow!("query cancellation key already exists: {key}"))?; + match self.0.entry(key) { + dashmap::mapref::entry::Entry::Occupied(_) => { + bail!("query cancellation key already exists: {key}") + } + dashmap::mapref::entry::Entry::Vacant(e) => { + e.insert(None); + } + } // This will guarantee that the session gets dropped // as soon as the future is finished. scopeguard::defer! { - self.0.write().remove(&key); + self.0.remove(&key); info!("dropped query cancellation key {key}"); } @@ -59,12 +62,12 @@ impl CancelMap { #[cfg(test)] fn contains(&self, session: &Session) -> bool { - self.0.read().contains_key(&session.key) + self.0.contains_key(&session.key) } #[cfg(test)] fn is_empty(&self) -> bool { - self.0.read().is_empty() + self.0.is_empty() } } @@ -113,10 +116,7 @@ impl Session<'_> { /// This enables query cancellation in `crate::proxy::prepare_client_connection`. pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData { info!("enabling query cancellation for this session"); - self.cancel_map - .0 - .write() - .insert(self.key, Some(cancel_closure)); + self.cancel_map.0.insert(self.key, Some(cancel_closure)); self.key }