Add request ID to io-in-progress locking table, to ease debugging

I also added INFO messages for when a backend blocks on the
io-in-progress lock. It's probably too noisy for production, but
useful now to get a picture of how much it happens.
This commit is contained in:
Heikki Linnakangas
2025-07-04 15:30:56 +03:00
parent 3231cb6138
commit 5f2d476a58
2 changed files with 35 additions and 24 deletions

View File

@@ -1,3 +1,6 @@
//! Lock table to ensure that only one IO request is in flight for a given
//! block (or relation or database metadata) at a time
use std::cmp::Eq;
use std::hash::Hash;
use std::sync::Arc;
@@ -16,67 +19,75 @@ pub enum RequestInProgressKey {
Block(RelTag, u32),
}
pub type RequestInProgressTable = MutexHashSet<RequestInProgressKey>;
type RequestId = u64;
pub type RequestInProgressTable = MutexHashMap<RequestInProgressKey, RequestId>;
// more primitive locking thingie:
pub struct MutexHashSet<K>
pub struct MutexHashMap<K, V>
where
K: Clone + Eq + Hash,
{
lock_table: ClashMap<K, Arc<Mutex<()>>>,
lock_table: ClashMap<K, (V, Arc<Mutex<()>>)>,
}
pub struct MutexHashSetGuard<'a, K>
pub struct MutexHashMapGuard<'a, K, V>
where
K: Clone + Eq + Hash,
{
pub key: K,
set: &'a MutexHashSet<K>,
map: &'a MutexHashMap<K, V>,
mutex: Arc<Mutex<()>>,
_guard: OwnedMutexGuard<()>,
}
impl<'a, K> Drop for MutexHashSetGuard<'a, K>
impl<'a, K, V> Drop for MutexHashMapGuard<'a, K, V>
where
K: Clone + Eq + Hash,
{
fn drop(&mut self) {
let (_old_key, old_val) = self.set.lock_table.remove(&self.key).unwrap();
assert!(Arc::ptr_eq(&old_val, &self.mutex));
let (_old_key, old_val) = self.map.lock_table.remove(&self.key).unwrap();
assert!(Arc::ptr_eq(&old_val.1, &self.mutex));
// the guard will be dropped as we return
}
}
impl<K> MutexHashSet<K>
impl<K, V> MutexHashMap<K, V>
where
K: Clone + Eq + Hash,
V: std::fmt::Display + Copy,
{
pub fn new() -> MutexHashSet<K> {
MutexHashSet {
pub fn new() -> MutexHashMap<K, V> {
MutexHashMap {
lock_table: ClashMap::new(),
}
}
pub async fn lock<'a>(&'a self, key: K) -> MutexHashSetGuard<'a, K> {
pub async fn lock<'a>(&'a self, key: K, val: V) -> MutexHashMapGuard<'a, K, V> {
let my_mutex = Arc::new(Mutex::new(()));
let my_guard = Arc::clone(&my_mutex).lock_owned().await;
loop {
let lock = match self.lock_table.entry(key.clone()) {
Entry::Occupied(e) => Arc::clone(e.get()),
let (request_id, lock) = match self.lock_table.entry(key.clone()) {
Entry::Occupied(e) => {
let e = e.get();
(e.0, Arc::clone(&e.1))
},
Entry::Vacant(e) => {
e.insert(Arc::clone(&my_mutex));
e.insert((val, Arc::clone(&my_mutex)));
break;
}
};
tracing::info!("waiting for conflicting IO {request_id} to complete");
let _ = lock.lock().await;
tracing::info!("conflicting IO {request_id} completed");
}
MutexHashSetGuard {
MutexHashMapGuard {
key,
set: self,
map: self,
mutex: my_mutex,
_guard: my_guard,
}

View File

@@ -281,7 +281,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
let _in_progress_guard =
self.in_progress_table.lock(RequestInProgressKey::Rel(rel)).await;
self.in_progress_table.lock(RequestInProgressKey::Rel(rel), req.request_id).await;
let not_modified_since = match self.cache.get_rel_exists(&rel) {
CacheResult::Found(exists) => return NeonIOResult::RelExists(exists),
@@ -309,7 +309,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
let _in_progress_guard =
self.in_progress_table.lock(RequestInProgressKey::Rel(rel)).await;
self.in_progress_table.lock(RequestInProgressKey::Rel(rel), req.request_id).await;
// Check the cache first
let not_modified_since = match self.cache.get_rel_size(&rel) {
@@ -360,7 +360,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
self.request_db_size_counter.inc();
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Db(req.db_oid))
.lock(RequestInProgressKey::Db(req.db_oid), req.request_id)
.await;
// Check the cache first
@@ -396,7 +396,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Block(rel, req.block_number))
.lock(RequestInProgressKey::Block(rel, req.block_number), req.request_id)
.await;
self.cache
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
@@ -409,7 +409,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Block(rel, req.block_number))
.lock(RequestInProgressKey::Block(rel, req.block_number), req.request_id)
.await;
self.cache
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
@@ -474,7 +474,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// because they're always acquired in the same order.
let in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Block(rel, blkno))
.lock(RequestInProgressKey::Block(rel, blkno), req.request_id)
.await;
let dest = req.dest[i as usize];
@@ -571,7 +571,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// because they're always acquired in the same order.
let in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Block(rel, blkno))
.lock(RequestInProgressKey::Block(rel, blkno), req.request_id)
.await;
let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await {