Minor cleanup and commenting

This commit is contained in:
Heikki Linnakangas
2025-07-04 18:28:34 +03:00
parent bd46dd60a0
commit 1856bbbb9f
4 changed files with 61 additions and 26 deletions

View File

@@ -31,6 +31,13 @@ pub extern "C" fn rcommunicator_backend_init(
cis: Box<CommunicatorInitStruct>,
my_proc_number: i32,
) -> &'static mut CommunicatorBackendStruct<'static> {
if my_proc_number < 0 || my_proc_number as u32 >= cis.max_procs {
panic!(
"cannot attach to communicator shared memory with procnumber {} (max_procs {})",
my_proc_number, cis.max_procs,
);
}
let start_idx = my_proc_number as u32 * cis.num_neon_request_slots_per_backend;
let end_idx = start_idx + cis.num_neon_request_slots_per_backend;

View File

@@ -74,7 +74,7 @@ where
Entry::Occupied(e) => {
let e = e.get();
(e.0, Arc::clone(&e.1))
},
}
Entry::Vacant(e) => {
e.insert((val, Arc::clone(&my_mutex)));
break;

View File

@@ -218,15 +218,17 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let request_id = slot.get_request().request_id();
trace!("spawned task to process request {request_id} at slot {request_idx}");
// FIXME: as a temporary hack, abort request if we don't get a response promptly.
// Lots of regression tests are getting stuck and failing at the moment,
// this makes them fail a little faster, which it faster to iterate.
// This needs to be removed once more regression tests are passing.
// FIXME: as a temporary hack, abort request if we don't get a response promptly.
// Lots of regression tests are getting stuck and failing at the moment,
// this makes them fail a little faster, which it faster to iterate.
// This needs to be removed once more regression tests are passing.
// See also similar hack in the backend code, in wait_request_completion()
let result = tokio::time::timeout(
tokio::time::Duration::from_secs(30),
self.handle_request(slot.get_request())
).await.unwrap_or_else(|_elapsed| {
self.handle_request(slot.get_request()),
)
.await
.unwrap_or_else(|_elapsed| {
info!("request {request_id} timed out");
NeonIOResult::Error(libc::ETIMEDOUT)
});
@@ -295,8 +297,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
self.request_rel_exists_counter.inc();
let rel = req.reltag();
let _in_progress_guard =
self.in_progress_table.lock(RequestInProgressKey::Rel(rel), req.request_id).await;
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Rel(rel), req.request_id)
.await;
let not_modified_since = match self.cache.get_rel_exists(&rel) {
CacheResult::Found(exists) => return NeonIOResult::RelExists(exists),
@@ -323,8 +327,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
self.request_rel_size_counter.inc();
let rel = req.reltag();
let _in_progress_guard =
self.in_progress_table.lock(RequestInProgressKey::Rel(rel), req.request_id).await;
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Rel(rel), req.request_id)
.await;
// Check the cache first
let not_modified_since = match self.cache.get_rel_size(&rel) {
@@ -411,7 +417,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Block(rel, req.block_number), req.request_id)
.lock(
RequestInProgressKey::Block(rel, req.block_number),
req.request_id,
)
.await;
self.cache
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
@@ -424,7 +433,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let rel = req.reltag();
let _in_progress_guard = self
.in_progress_table
.lock(RequestInProgressKey::Block(rel, req.block_number), req.request_id)
.lock(
RequestInProgressKey::Block(rel, req.block_number),
req.request_id,
)
.await;
self.cache
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
@@ -521,7 +533,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// Form a pageserver request for the cache misses
let read_lsn = self.request_lsns(not_modified_since);
let block_numbers: Vec<u32> = cache_misses.iter().map(|(blkno, _lsn, _dest, _guard)| *blkno).collect();
let block_numbers: Vec<u32> = cache_misses
.iter()
.map(|(blkno, _lsn, _dest, _guard)| *blkno)
.collect();
info!(
"sending getpage request for blocks {:?} in rel {:?} lsns {}",
block_numbers, rel, read_lsn
@@ -543,11 +558,14 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
if resp.page_images.len() != block_numbers.len() {
error!(
"received unexpected response with {} page images from pageserver for a request for {} pages",
resp.page_images.len(), block_numbers.len(),
resp.page_images.len(),
block_numbers.len(),
);
return Err(-1);
}
for (page_image, (blkno, _lsn, dest, _guard)) in resp.page_images.into_iter().zip(cache_misses) {
for (page_image, (blkno, _lsn, dest, _guard)) in
resp.page_images.into_iter().zip(cache_misses)
{
let src: &[u8] = page_image.as_ref();
let len = std::cmp::min(src.len(), dest.bytes_total());
unsafe {
@@ -565,7 +583,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
)
.await;
}
},
}
Err(err) => {
info!("tonic error: {err:?}");
return Err(-1);
@@ -607,7 +625,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
.map(|(_blkno, lsn, _guard)| *lsn)
.max()
.unwrap();
let block_numbers: Vec<u32> = cache_misses.iter().map(|(blkno, _lsn, _guard)| *blkno).collect();
let block_numbers: Vec<u32> = cache_misses
.iter()
.map(|(blkno, _lsn, _guard)| *blkno)
.collect();
// TODO: spawn separate tasks for these. Use the integrated cache to keep track of the
// in-flight requests
@@ -631,17 +652,20 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
if resp.page_images.len() != block_numbers.len() {
error!(
"received unexpected response with {} page images from pageserver for a request for {} pages",
resp.page_images.len(), block_numbers.len(),
resp.page_images.len(),
block_numbers.len(),
);
return Err(-1);
}
for (page_image, (blkno, _lsn, _guard)) in resp.page_images.into_iter().zip(cache_misses) {
for (page_image, (blkno, _lsn, _guard)) in
resp.page_images.into_iter().zip(cache_misses)
{
self.cache
.remember_page(&rel, blkno, page_image, not_modified_since, false)
.await;
}
},
}
Err(err) => {
info!("tonic error: {err:?}");
return Err(-1);

View File

@@ -109,8 +109,6 @@ static CommunicatorShmemData *communicator_shmem_ptr;
#define MyIOCompletionLatch (&communicator_shmem_ptr->backends[MyProcNumber].io_completion_latch)
static slock_t in_elog;
#define MAX_INFLIGHT_ASYNC_REQUESTS 5
/* request indexes of (prefetch) requests that have been started */
@@ -185,8 +183,6 @@ pg_init_communicator_new(void)
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
SpinLockInit(&in_elog);
}
static size_t
@@ -437,6 +433,7 @@ communicator_new_init(void)
return;
}
/* The communicator process performs different initialization */
if (MyBgworkerEntry && strcmp(MyBgworkerEntry->bgw_function_name, "communicator_new_bgworker_main") == 0)
return;
@@ -696,7 +693,14 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe
elog(DEBUG5, "getpagev called for rel %u/%u/%u.%u block %u (%u blocks)",
RelFileInfoFmt(rinfo), forkNum, blockno, nblocks);
/* Fill in the destination buffers in the request */
/*
* Fill in the destination buffer pointers in the request. If the
* destination is a buffer in shared memory, the communicator process can
* write the result directly to the buffer. Otherwise, we need to use a
* "bounce buffer". We only have one bounce buffer, so if bouncing is
* needed and multiple pages were requested, we need to serially perform a
* separate request for each page. Hopefully that is rare.
*/
if (nblocks == 1)
{
if (bounce_needed(buffers[0]))