diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index 17fee7d000..65875c5ece 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -31,6 +31,13 @@ pub extern "C" fn rcommunicator_backend_init( cis: Box, my_proc_number: i32, ) -> &'static mut CommunicatorBackendStruct<'static> { + if my_proc_number < 0 || my_proc_number as u32 >= cis.max_procs { + panic!( + "cannot attach to communicator shared memory with procnumber {} (max_procs {})", + my_proc_number, cis.max_procs, + ); + } + let start_idx = my_proc_number as u32 * cis.num_neon_request_slots_per_backend; let end_idx = start_idx + cis.num_neon_request_slots_per_backend; diff --git a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs index 520208a607..5979c9c23b 100644 --- a/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs +++ b/pgxn/neon/communicator/src/worker_process/in_progress_ios.rs @@ -74,7 +74,7 @@ where Entry::Occupied(e) => { let e = e.get(); (e.0, Arc::clone(&e.1)) - }, + } Entry::Vacant(e) => { e.insert((val, Arc::clone(&my_mutex))); break; diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 8abde953ce..6a80925efd 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -218,15 +218,17 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let request_id = slot.get_request().request_id(); trace!("spawned task to process request {request_id} at slot {request_idx}"); - // FIXME: as a temporary hack, abort request if we don't get a response promptly. - // Lots of regression tests are getting stuck and failing at the moment, - // this makes them fail a little faster, which it faster to iterate. - // This needs to be removed once more regression tests are passing. + // FIXME: as a temporary hack, abort request if we don't get a response promptly. + // Lots of regression tests are getting stuck and failing at the moment, + // this makes them fail a little faster, which it faster to iterate. + // This needs to be removed once more regression tests are passing. // See also similar hack in the backend code, in wait_request_completion() let result = tokio::time::timeout( tokio::time::Duration::from_secs(30), - self.handle_request(slot.get_request()) - ).await.unwrap_or_else(|_elapsed| { + self.handle_request(slot.get_request()), + ) + .await + .unwrap_or_else(|_elapsed| { info!("request {request_id} timed out"); NeonIOResult::Error(libc::ETIMEDOUT) }); @@ -295,8 +297,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { self.request_rel_exists_counter.inc(); let rel = req.reltag(); - let _in_progress_guard = - self.in_progress_table.lock(RequestInProgressKey::Rel(rel), req.request_id).await; + let _in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Rel(rel), req.request_id) + .await; let not_modified_since = match self.cache.get_rel_exists(&rel) { CacheResult::Found(exists) => return NeonIOResult::RelExists(exists), @@ -323,8 +327,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { self.request_rel_size_counter.inc(); let rel = req.reltag(); - let _in_progress_guard = - self.in_progress_table.lock(RequestInProgressKey::Rel(rel), req.request_id).await; + let _in_progress_guard = self + .in_progress_table + .lock(RequestInProgressKey::Rel(rel), req.request_id) + .await; // Check the cache first let not_modified_since = match self.cache.get_rel_size(&rel) { @@ -411,7 +417,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); let _in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel, req.block_number), req.request_id) + .lock( + RequestInProgressKey::Block(rel, req.block_number), + req.request_id, + ) .await; self.cache .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) @@ -424,7 +433,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let rel = req.reltag(); let _in_progress_guard = self .in_progress_table - .lock(RequestInProgressKey::Block(rel, req.block_number), req.request_id) + .lock( + RequestInProgressKey::Block(rel, req.block_number), + req.request_id, + ) .await; self.cache .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) @@ -521,7 +533,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // Form a pageserver request for the cache misses let read_lsn = self.request_lsns(not_modified_since); - let block_numbers: Vec = cache_misses.iter().map(|(blkno, _lsn, _dest, _guard)| *blkno).collect(); + let block_numbers: Vec = cache_misses + .iter() + .map(|(blkno, _lsn, _dest, _guard)| *blkno) + .collect(); info!( "sending getpage request for blocks {:?} in rel {:?} lsns {}", block_numbers, rel, read_lsn @@ -543,11 +558,14 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { if resp.page_images.len() != block_numbers.len() { error!( "received unexpected response with {} page images from pageserver for a request for {} pages", - resp.page_images.len(), block_numbers.len(), + resp.page_images.len(), + block_numbers.len(), ); return Err(-1); } - for (page_image, (blkno, _lsn, dest, _guard)) in resp.page_images.into_iter().zip(cache_misses) { + for (page_image, (blkno, _lsn, dest, _guard)) in + resp.page_images.into_iter().zip(cache_misses) + { let src: &[u8] = page_image.as_ref(); let len = std::cmp::min(src.len(), dest.bytes_total()); unsafe { @@ -565,7 +583,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { ) .await; } - }, + } Err(err) => { info!("tonic error: {err:?}"); return Err(-1); @@ -607,7 +625,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { .map(|(_blkno, lsn, _guard)| *lsn) .max() .unwrap(); - let block_numbers: Vec = cache_misses.iter().map(|(blkno, _lsn, _guard)| *blkno).collect(); + let block_numbers: Vec = cache_misses + .iter() + .map(|(blkno, _lsn, _guard)| *blkno) + .collect(); // TODO: spawn separate tasks for these. Use the integrated cache to keep track of the // in-flight requests @@ -631,17 +652,20 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { if resp.page_images.len() != block_numbers.len() { error!( "received unexpected response with {} page images from pageserver for a request for {} pages", - resp.page_images.len(), block_numbers.len(), + resp.page_images.len(), + block_numbers.len(), ); return Err(-1); } - for (page_image, (blkno, _lsn, _guard)) in resp.page_images.into_iter().zip(cache_misses) { + for (page_image, (blkno, _lsn, _guard)) in + resp.page_images.into_iter().zip(cache_misses) + { self.cache .remember_page(&rel, blkno, page_image, not_modified_since, false) .await; } - }, + } Err(err) => { info!("tonic error: {err:?}"); return Err(-1); diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index af6f6b18a4..9c49f807f2 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -109,8 +109,6 @@ static CommunicatorShmemData *communicator_shmem_ptr; #define MyIOCompletionLatch (&communicator_shmem_ptr->backends[MyProcNumber].io_completion_latch) -static slock_t in_elog; - #define MAX_INFLIGHT_ASYNC_REQUESTS 5 /* request indexes of (prefetch) requests that have been started */ @@ -185,8 +183,6 @@ pg_init_communicator_new(void) bgw.bgw_main_arg = (Datum) 0; RegisterBackgroundWorker(&bgw); - - SpinLockInit(&in_elog); } static size_t @@ -437,6 +433,7 @@ communicator_new_init(void) return; } + /* The communicator process performs different initialization */ if (MyBgworkerEntry && strcmp(MyBgworkerEntry->bgw_function_name, "communicator_new_bgworker_main") == 0) return; @@ -696,7 +693,14 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe elog(DEBUG5, "getpagev called for rel %u/%u/%u.%u block %u (%u blocks)", RelFileInfoFmt(rinfo), forkNum, blockno, nblocks); - /* Fill in the destination buffers in the request */ + /* + * Fill in the destination buffer pointers in the request. If the + * destination is a buffer in shared memory, the communicator process can + * write the result directly to the buffer. Otherwise, we need to use a + * "bounce buffer". We only have one bounce buffer, so if bouncing is + * needed and multiple pages were requested, we need to serially perform a + * separate request for each page. Hopefully that is rare. + */ if (nblocks == 1) { if (bounce_needed(buffers[0]))