reduce cancel map contention (#5555)

## Problem

Every database request locks this cancel map rwlock. At high requests
per second this would have high contention

## Summary of changes

Switch to dashmap which has a sharded rwlock to reduce contention
This commit is contained in:
Conrad Ludgate
2023-10-23 14:12:41 +01:00
committed by GitHub
parent 41ee75bc71
commit 7d17f1719f

View File

@@ -1,5 +1,5 @@
use anyhow::{anyhow, Context};
use hashbrown::HashMap;
use anyhow::{bail, Context};
use dashmap::DashMap;
use pq_proto::CancelKeyData;
use std::net::SocketAddr;
use tokio::net::TcpStream;
@@ -8,7 +8,7 @@ use tracing::info;
/// Enables serving `CancelRequest`s.
#[derive(Default)]
pub struct CancelMap(parking_lot::RwLock<HashMap<CancelKeyData, Option<CancelClosure>>>);
pub struct CancelMap(DashMap<CancelKeyData, Option<CancelClosure>>);
impl CancelMap {
/// Cancel a running query for the corresponding connection.
@@ -16,7 +16,6 @@ impl CancelMap {
// NB: we should immediately release the lock after cloning the token.
let cancel_closure = self
.0
.read()
.get(&key)
.and_then(|x| x.clone())
.with_context(|| format!("query cancellation key not found: {key}"))?;
@@ -40,15 +39,19 @@ impl CancelMap {
// Random key collisions are unlikely to happen here, but they're still possible,
// which is why we have to take care not to rewrite an existing key.
self.0
.write()
.try_insert(key, None)
.map_err(|_| anyhow!("query cancellation key already exists: {key}"))?;
match self.0.entry(key) {
dashmap::mapref::entry::Entry::Occupied(_) => {
bail!("query cancellation key already exists: {key}")
}
dashmap::mapref::entry::Entry::Vacant(e) => {
e.insert(None);
}
}
// This will guarantee that the session gets dropped
// as soon as the future is finished.
scopeguard::defer! {
self.0.write().remove(&key);
self.0.remove(&key);
info!("dropped query cancellation key {key}");
}
@@ -59,12 +62,12 @@ impl CancelMap {
#[cfg(test)]
fn contains(&self, session: &Session) -> bool {
self.0.read().contains_key(&session.key)
self.0.contains_key(&session.key)
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.0.read().is_empty()
self.0.is_empty()
}
}
@@ -113,10 +116,7 @@ impl Session<'_> {
/// This enables query cancellation in `crate::proxy::prepare_client_connection`.
pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData {
info!("enabling query cancellation for this session");
self.cancel_map
.0
.write()
.insert(self.key, Some(cancel_closure));
self.cancel_map.0.insert(self.key, Some(cancel_closure));
self.key
}