diff --git a/pgxn/neon/communicator/src/backend_comms.rs b/pgxn/neon/communicator/src/backend_comms.rs index c798dcf30e..1da7c6a85e 100644 --- a/pgxn/neon/communicator/src/backend_comms.rs +++ b/pgxn/neon/communicator/src/backend_comms.rs @@ -179,9 +179,8 @@ impl NeonIOHandle { } } + /// Read the IO request from the slot indicated in the wakeup pub fn start_processing_request<'a>(&'a self) -> Option> { - // Read the IO request from the slot indicated in the wakeup - // // XXX: using compare_exchange for this is not strictly necessary, as long as // the communicator process has _some_ means of tracking which requests it's // already processing. That could be a flag somewhere in communicator's private diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index 65875c5ece..168fd4ad98 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -12,7 +12,7 @@ use crate::neon_request::{NeonIORequest, NeonIOResult}; pub struct CommunicatorBackendStruct<'t> { my_proc_number: i32, - next_neon_request_idx: u32, + next_request_slot_idx: u32, my_start_idx: u32, // First request slot that belongs to this backend my_end_idx: u32, // end + 1 request slot that belongs to this backend @@ -46,7 +46,7 @@ pub extern "C" fn rcommunicator_backend_init( let bs: &'static mut CommunicatorBackendStruct = Box::leak(Box::new(CommunicatorBackendStruct { my_proc_number, - next_neon_request_idx: start_idx, + next_request_slot_idx: start_idx, my_start_idx: start_idx, my_end_idx: end_idx, neon_request_slots: cis.neon_request_slots, @@ -83,12 +83,12 @@ pub extern "C" fn bcomm_start_io_request( } // Create neon request and submit it - let request_idx = bs.start_neon_request(request); + let slot_idx = bs.start_neon_io_request(request); // Tell the communicator about it - bs.submit_request(request_idx); + bs.submit_request(slot_idx); - request_idx + slot_idx } #[unsafe(no_mangle)] @@ -124,12 +124,12 @@ pub extern "C" fn bcomm_start_get_page_v_request( } // Create neon request and submit it - let request_idx = bs.start_neon_request(request); + let slot_idx = bs.start_neon_io_request(request); // Tell the communicator about it - bs.submit_request(request_idx); + bs.submit_request(slot_idx); - request_idx + slot_idx } /// Check if a request has completed. Returns: @@ -139,10 +139,10 @@ pub extern "C" fn bcomm_start_get_page_v_request( #[unsafe(no_mangle)] pub extern "C" fn bcomm_poll_request_completion( bs: &mut CommunicatorBackendStruct, - request_idx: u32, + request_slot_idx: u32, result_p: &mut NeonIOResult, ) -> i32 { - match bs.neon_request_slots[request_idx as usize].try_get_result() { + match bs.neon_request_slots[request_slot_idx as usize].try_get_result() { None => -1, // still processing Some(result) => { *result_p = result; @@ -188,7 +188,7 @@ pub extern "C" fn bcomm_cache_contains( impl<'t> CommunicatorBackendStruct<'t> { /// Send a wakeup to the communicator process - fn submit_request(self: &CommunicatorBackendStruct<'t>, request_idx: i32) { + fn submit_request(self: &CommunicatorBackendStruct<'t>, request_slot_idx: i32) { // wake up communicator by writing the idx to the submission pipe // // This can block, if the pipe is full. That should be very rare, @@ -198,7 +198,7 @@ impl<'t> CommunicatorBackendStruct<'t> { // backend. // // If it does block very briefly, that's not too serious. - let idxbuf = request_idx.to_ne_bytes(); + let idxbuf = request_slot_idx.to_ne_bytes(); let _res = nix::unistd::write(&self.submission_pipe_write_fd, &idxbuf); // FIXME: check result, return any errors @@ -206,15 +206,15 @@ impl<'t> CommunicatorBackendStruct<'t> { /// Note: there's no guarantee on when the communicator might pick it up. You should ring /// the doorbell. But it might pick it up immediately. - pub(crate) fn start_neon_request(&mut self, request: &NeonIORequest) -> i32 { + pub(crate) fn start_neon_io_request(&mut self, request: &NeonIORequest) -> i32 { let my_proc_number = self.my_proc_number; // Grab next free slot // FIXME: any guarantee that there will be any? - let idx = self.next_neon_request_idx; + let idx = self.next_request_slot_idx; let next_idx = idx + 1; - self.next_neon_request_idx = if next_idx == self.my_end_idx { + self.next_request_slot_idx = if next_idx == self.my_end_idx { self.my_start_idx } else { next_idx diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 193039f6af..5c773fa58e 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -626,6 +626,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { None } + /// Resize the local file cache. pub fn resize_file_cache(&self, num_blocks: u32) { let old_num_blocks = self.block_map.get_num_buckets() as u32; @@ -638,6 +639,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> { err ); } + } else { + // TODO: Shrinking not implemented yet } } diff --git a/pgxn/neon/communicator/src/worker_process/logging.rs b/pgxn/neon/communicator/src/worker_process/logging.rs index 2df9ae7bfd..685bc27efd 100644 --- a/pgxn/neon/communicator/src/worker_process/logging.rs +++ b/pgxn/neon/communicator/src/worker_process/logging.rs @@ -1,7 +1,7 @@ -//! Glue code to hook up Rust logging, with the `tracing` crate, to the PostgreSQL log +//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log //! //! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres -//! process latch is raised. That wakes up the loop in the main thread. It reads the +//! process latch is raised. That wakes up the loop in the main thread. It reads the //! message from the channel and ereport()s it. This ensures that only one thread, the main //! thread, calls the PostgreSQL logging routines at any time. diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 6a80925efd..71e8e43537 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -189,34 +189,29 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { loop { // Wait for a backend to ring the doorbell - match submission_pipe_read.read(&mut idxbuf).await { Ok(4) => {} Ok(nbytes) => panic!("short read ({nbytes} bytes) on communicator pipe"), Err(e) => panic!("error reading from communicator pipe: {e}"), } - let request_idx = u32::from_ne_bytes(idxbuf); + let slot_idx = u32::from_ne_bytes(idxbuf) as usize; // Read the IO request from the slot indicated in the wakeup - let Some(slot) = - self.neon_request_slots[request_idx as usize].start_processing_request() - else { - // This currently should not happen. But if we have multiple threads picking up + let Some(slot) = self.neon_request_slots[slot_idx].start_processing_request() else { + // This currently should not happen. But if we had multiple threads picking up // requests, and without waiting for the notifications, it could. panic!("no request in slot"); }; - // Ok, we have ownership of this request now. We must process - // it now, there's no going back. - - //trace!("processing request {request_idx}: {request:?}"); - + // Ok, we have ownership of this request now. We must process it now, there's no going + // back. + // // Spawn a separate task for every request. That's a little excessive for requests that // can be quickly satisfied from the cache, but we expect that to be rare, because the // requesting backend would have already checked the cache. tokio::spawn(async move { let request_id = slot.get_request().request_id(); - trace!("spawned task to process request {request_id} at slot {request_idx}"); + trace!("spawned task to process request {request_id} at slot {slot_idx}"); // FIXME: as a temporary hack, abort request if we don't get a response promptly. // Lots of regression tests are getting stuck and failing at the moment, @@ -232,7 +227,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { info!("request {request_id} timed out"); NeonIOResult::Error(libc::ETIMEDOUT) }); - trace!("request {request_id} at slot {request_idx} completed"); + trace!("request {request_id} at slot {slot_idx} completed"); let owner_procno = slot.get_owner_procno(); // Ok, we have completed the IO. Mark the request as completed. After that, @@ -246,6 +241,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } + /// Compute the 'request_lsn' to use for a pageserver request fn request_lsns(&self, not_modified_since_lsn: Lsn) -> page_api::ReadLsn { let mut request_lsn = get_request_lsn(); @@ -287,6 +283,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } + /// Handle one IO request async fn handle_request(&'static self, req: &'_ NeonIORequest) -> NeonIOResult { match req { NeonIORequest::Empty => { @@ -302,6 +299,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { .lock(RequestInProgressKey::Rel(rel), req.request_id) .await; + // Check the cache first let not_modified_since = match self.cache.get_rel_exists(&rel) { CacheResult::Found(exists) => return NeonIOResult::RelExists(exists), CacheResult::NotFound(lsn) => lsn, @@ -413,7 +411,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { NeonIORequest::WritePage(req) => { self.request_write_page_counter.inc(); - // Also store it in the LFC while we still have it let rel = req.reltag(); let _in_progress_guard = self .in_progress_table @@ -422,6 +419,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { req.request_id, ) .await; + + // We must at least update the last-written LSN on the page, but also store the page + // image in the LFC while we still have it self.cache .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) .await; @@ -438,6 +438,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { req.request_id, ) .await; + + // We must at least update the last-written LSN on the page and the relation size, + // but also store the page image in the LFC while we still have it self.cache .remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true) .await; @@ -481,6 +484,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { } } + /// Subroutine to handle a GetPageV request, since it's a little more complicated than + /// others. async fn handle_get_pagev_request(&'t self, req: &CGetPageVRequest) -> Result<(), i32> { let rel = req.reltag(); @@ -488,10 +493,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // // Note: Because the backends perform a direct lookup in the cache before sending // the request to the communicator process, we expect the pages to almost never - // be already in cache. It could happen when: + // be already in cache. It could happen if: // 1. two backends try to read the same page at the same time, but that should never // happen because there's higher level locking in the Postgres buffer manager, or - // 2. if a prefetch request finished at the same time as a backend requested the + // 2. a prefetch request finished at the same time as a backend requested the // page. That's much more likely. let mut cache_misses = Vec::with_capacity(req.nblocks as usize); for i in 0..req.nblocks { @@ -530,13 +535,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { .max() .unwrap(); - // Form a pageserver request for the cache misses - let read_lsn = self.request_lsns(not_modified_since); - + // Construct a pageserver request for the cache misses let block_numbers: Vec = cache_misses .iter() .map(|(blkno, _lsn, _dest, _guard)| *blkno) .collect(); + let read_lsn = self.request_lsns(not_modified_since); info!( "sending getpage request for blocks {:?} in rel {:?} lsns {}", block_numbers, rel, read_lsn @@ -592,6 +596,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { Ok(()) } + /// Subroutine to handle a PrefetchV request, since it's a little more complicated than + /// others. + /// + /// This is very similar to a GetPageV request, but the results are only stored in the cache. async fn handle_prefetchv_request(&'static self, req: &CPrefetchVRequest) -> Result<(), i32> { let rel = req.reltag();