From 5f2d476a5873bc28b7fd27633fcf351820f325cf Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 4 Jul 2025 15:30:56 +0300 Subject: [PATCH] Add request ID to io-in-progress locking table, to ease debugging I also added INFO messages for when a backend blocks on the io-in-progress lock. It's probably too noisy for production, but useful now to get a picture of how much it happens. --- .../src/worker_process/in_progress_ios.rs | 45 ++++++++++++------- .../src/worker_process/main_loop.rs | 14 +++--- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs index f2c738b2be..520208a607 100644 --- a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs +++ b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs @@ -1,3 +1,6 @@ +//! Lock table to ensure that only one IO request is in flight for a given +//! block (or relation or database metadata) at a time + use std::cmp::Eq; use std::hash::Hash; use std::sync::Arc; @@ -16,67 +19,75 @@ pub enum RequestInProgressKey { Block(RelTag, u32), } -pub type RequestInProgressTable = MutexHashSet; +type RequestId = u64; + +pub type RequestInProgressTable = MutexHashMap; // more primitive locking thingie: -pub struct MutexHashSet +pub struct MutexHashMap where K: Clone + Eq + Hash, { - lock_table: ClashMap>>, + lock_table: ClashMap>)>, } -pub struct MutexHashSetGuard<'a, K> +pub struct MutexHashMapGuard<'a, K, V> where K: Clone + Eq + Hash, { pub key: K, - set: &'a MutexHashSet, + map: &'a MutexHashMap, mutex: Arc>, _guard: OwnedMutexGuard<()>, } -impl<'a, K> Drop for MutexHashSetGuard<'a, K> +impl<'a, K, V> Drop for MutexHashMapGuard<'a, K, V> where K: Clone + Eq + Hash, { fn drop(&mut self) { - let (_old_key, old_val) = self.set.lock_table.remove(&self.key).unwrap(); - assert!(Arc::ptr_eq(&old_val, &self.mutex)); + let (_old_key, old_val) = self.map.lock_table.remove(&self.key).unwrap(); + assert!(Arc::ptr_eq(&old_val.1, &self.mutex)); // the guard will be dropped as we return } } -impl MutexHashSet +impl MutexHashMap where K: Clone + Eq + Hash, + V: std::fmt::Display + Copy, { - pub fn new() -> MutexHashSet { - MutexHashSet { + pub fn new() -> MutexHashMap { + MutexHashMap { lock_table: ClashMap::new(), } } - pub async fn lock<'a>(&'a self, key: K) -> MutexHashSetGuard<'a, K> { + pub async fn lock<'a>(&'a self, key: K, val: V) -> MutexHashMapGuard<'a, K, V> { let my_mutex = Arc::new(Mutex::new(())); let my_guard = Arc::clone(&my_mutex).lock_owned().await; loop { - let lock = match self.lock_table.entry(key.clone()) { - Entry::Occupied(e) => Arc::clone(e.get()), + let (request_id, lock) = match self.lock_table.entry(key.clone()) { + Entry::Occupied(e) => { + let e = e.get(); + (e.0, Arc::clone(&e.1)) + }, Entry::Vacant(e) => { - e.insert(Arc::clone(&my_mutex)); + e.insert((val, Arc::clone(&my_mutex))); break; } }; + tracing::info!("waiting for conflicting IO {request_id} to complete"); let _ = lock.lock().await; + tracing::info!("conflicting IO {request_id} completed"); } - MutexHashSetGuard { + MutexHashMapGuard { key, - set: self, + map: self, mutex: my_mutex, _guard: my_guard, } diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 4ec907a2f2..2b19024038 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -281,7 +281,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); let _in_progress_guard = - self.in_progress_table.lock(RequestInProgressKey::Rel(rel)).await; + self.in_progress_table.lock(RequestInProgressKey::Rel(rel), req.request_id).await; let not_modified_since = match self.cache.get_rel_exists(&rel) { CacheResult::Found(exists) => return NeonIOResult::RelExists(exists), @@ -309,7 +309,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); let _in_progress_guard = - self.in_progress_table.lock(RequestInProgressKey::Rel(rel)).await; + self.in_progress_table.lock(RequestInProgressKey::Rel(rel), req.request_id).await; // Check the cache first let not_modified_since = match self.cache.get_rel_size(&rel) { @@ -360,7 +360,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { self.request_db_size_counter.inc(); let _in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Db(req.db_oid)) + .lock(RequestInProgressKey::Db(req.db_oid), req.request_id) .await; // Check the cache first @@ -396,7 +396,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); let _in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel, req.block_number)) + .lock(RequestInProgressKey::Block(rel, req.block_number), req.request_id) .await; self.cache .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) @@ -409,7 +409,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); let _in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel, req.block_number)) + .lock(RequestInProgressKey::Block(rel, req.block_number), req.request_id) .await; self.cache .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) @@ -474,7 +474,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // because they're always acquired in the same order. let in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel, blkno)) + .lock(RequestInProgressKey::Block(rel, blkno), req.request_id) .await; let dest = req.dest[i as usize]; @@ -571,7 +571,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // because they're always acquired in the same order. let in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel, blkno)) + .lock(RequestInProgressKey::Block(rel, blkno), req.request_id) .await; let not_modified_since = match self.cache.page_is_cached(&rel, blkno).await {