diff --git a/pgxn/neon/communicator/src/backend_comms.rs b/pgxn/neon/communicator/src/backend_comms.rs index 1da7c6a85e..998e0daf71 100644 --- a/pgxn/neon/communicator/src/backend_comms.rs +++ b/pgxn/neon/communicator/src/backend_comms.rs @@ -165,6 +165,10 @@ impl NeonIOHandle { .store(NeonIOHandleState::Submitted, Ordering::Release); } + pub fn get_state(&self) -> NeonIOHandleState { + self.state.load(Ordering::Relaxed) + } + pub fn try_get_result(&self) -> Option { // FIXME: ordering? let state = self.state.load(Ordering::Relaxed); diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index 168fd4ad98..9ed9028b96 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -12,11 +12,6 @@ use crate::neon_request::{NeonIORequest, NeonIOResult}; pub struct CommunicatorBackendStruct<'t> { my_proc_number: i32, - next_request_slot_idx: u32, - - my_start_idx: u32, // First request slot that belongs to this backend - my_end_idx: u32, // end + 1 request slot that belongs to this backend - neon_request_slots: &'t [NeonIOHandle], submission_pipe_write_fd: OwnedFd, @@ -31,24 +26,18 @@ pub extern "C" fn rcommunicator_backend_init( cis: Box, my_proc_number: i32, ) -> &'static mut CommunicatorBackendStruct<'static> { - if my_proc_number < 0 || my_proc_number as u32 >= cis.max_procs { + if my_proc_number < 0 { panic!( - "cannot attach to communicator shared memory with procnumber {} (max_procs {})", - my_proc_number, cis.max_procs, + "cannot attach to communicator shared memory with procnumber {}", + my_proc_number, ); } - let start_idx = my_proc_number as u32 * cis.num_neon_request_slots_per_backend; - let end_idx = start_idx + cis.num_neon_request_slots_per_backend; - let integrated_cache = Box::leak(Box::new(cis.integrated_cache_init_struct.backend_init())); let bs: &'static mut CommunicatorBackendStruct = Box::leak(Box::new(CommunicatorBackendStruct { my_proc_number, - next_request_slot_idx: start_idx, - my_start_idx: start_idx, - my_end_idx: end_idx, neon_request_slots: cis.neon_request_slots, submission_pipe_write_fd: cis.submission_pipe_write_fd, @@ -66,9 +55,11 @@ pub extern "C" fn rcommunicator_backend_init( /// latch is set. /// /// Safety: The C caller must ensure that the references are valid. +/// The requested slot must be free, or this panics. #[unsafe(no_mangle)] pub extern "C" fn bcomm_start_io_request( bs: &'_ mut CommunicatorBackendStruct, + slot_idx: i32, request: &NeonIORequest, immediate_result_ptr: &mut NeonIOResult, ) -> i32 { @@ -83,7 +74,7 @@ pub extern "C" fn bcomm_start_io_request( } // Create neon request and submit it - let slot_idx = bs.start_neon_io_request(request); + bs.start_neon_io_request(slot_idx, request); // Tell the communicator about it bs.submit_request(slot_idx); @@ -94,6 +85,7 @@ pub extern "C" fn bcomm_start_io_request( #[unsafe(no_mangle)] pub extern "C" fn bcomm_start_get_page_v_request( bs: &mut CommunicatorBackendStruct, + slot_idx: i32, request: &NeonIORequest, immediate_result_ptr: &mut CCachedGetPageVResult, ) -> i32 { @@ -124,7 +116,7 @@ pub extern "C" fn bcomm_start_get_page_v_request( } // Create neon request and submit it - let slot_idx = bs.start_neon_io_request(request); + bs.start_neon_io_request(slot_idx, request); // Tell the communicator about it bs.submit_request(slot_idx); @@ -151,6 +143,32 @@ pub extern "C" fn bcomm_poll_request_completion( } } +/// Check if a request has completed. Returns: +/// +/// 'false' if the slot is Idle. The backend process has ownership. +/// 'true' if the slot is busy, and should be polled for result. +#[unsafe(no_mangle)] +pub extern "C" fn bcomm_get_request_slot_status( + bs: &mut CommunicatorBackendStruct, + request_slot_idx: u32, +) -> bool { + use crate::backend_comms::NeonIOHandleState; + match bs.neon_request_slots[request_slot_idx as usize].get_state() { + NeonIOHandleState::Idle => false, + NeonIOHandleState::Filling => { + // 'false' would be the right result here. However, this + // is a very transient state. The C code should never + // leave a slot in this state, so if it sees that, + // something's gone wrong and it's not clear what to do + // with it. + panic!("unexpected Filling state in request slot {}", request_slot_idx); + }, + NeonIOHandleState::Submitted => true, + NeonIOHandleState::Processing => true, + NeonIOHandleState::Completed => true, + } +} + // LFC functions /// Finish a local file cache read @@ -206,22 +224,11 @@ impl<'t> CommunicatorBackendStruct<'t> { /// Note: there's no guarantee on when the communicator might pick it up. You should ring /// the doorbell. But it might pick it up immediately. - pub(crate) fn start_neon_io_request(&mut self, request: &NeonIORequest) -> i32 { + /// + /// The slot must be free, or this panics. + pub(crate) fn start_neon_io_request(&mut self, request_slot_idx: i32, request: &NeonIORequest) { let my_proc_number = self.my_proc_number; - // Grab next free slot - // FIXME: any guarantee that there will be any? - let idx = self.next_request_slot_idx; - - let next_idx = idx + 1; - self.next_request_slot_idx = if next_idx == self.my_end_idx { - self.my_start_idx - } else { - next_idx - }; - - self.neon_request_slots[idx as usize].fill_request(request, my_proc_number); - - idx as i32 + self.neon_request_slots[request_slot_idx as usize].fill_request(request, my_proc_number); } } diff --git a/pgxn/neon/communicator/src/init.rs b/pgxn/neon/communicator/src/init.rs index 0053016e55..5f7d593c35 100644 --- a/pgxn/neon/communicator/src/init.rs +++ b/pgxn/neon/communicator/src/init.rs @@ -26,20 +26,15 @@ use std::os::fd::OwnedFd; use crate::backend_comms::NeonIOHandle; use crate::integrated_cache::IntegratedCacheInitStruct; -const NUM_NEON_REQUEST_SLOTS_PER_BACKEND: u32 = 5; - /// This struct is created in the postmaster process, and inherited to /// the communicator process and all backend processes through fork() #[repr(C)] pub struct CommunicatorInitStruct { - #[allow(dead_code)] - pub max_procs: u32, - pub submission_pipe_read_fd: OwnedFd, pub submission_pipe_write_fd: OwnedFd, // Shared memory data structures - pub num_neon_request_slots_per_backend: u32, + pub num_neon_request_slots: u32, pub neon_request_slots: &'static [NeonIOHandle], @@ -49,12 +44,11 @@ pub struct CommunicatorInitStruct { impl std::fmt::Debug for CommunicatorInitStruct { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { fmt.debug_struct("CommunicatorInitStruct") - .field("max_procs", &self.max_procs) .field("submission_pipe_read_fd", &self.submission_pipe_read_fd) .field("submission_pipe_write_fd", &self.submission_pipe_write_fd) .field( - "num_neon_request_slots_per_backend", - &self.num_neon_request_slots_per_backend, + "num_neon_request_slots", + &self.num_neon_request_slots, ) .field("neon_request_slots length", &self.neon_request_slots.len()) .finish() @@ -62,14 +56,13 @@ impl std::fmt::Debug for CommunicatorInitStruct { } #[unsafe(no_mangle)] -pub extern "C" fn rcommunicator_shmem_size(max_procs: u32) -> u64 { +pub extern "C" fn rcommunicator_shmem_size(num_neon_request_slots: u32) -> u64 { let mut size = 0; - let num_neon_request_slots = max_procs * NUM_NEON_REQUEST_SLOTS_PER_BACKEND; size += mem::size_of::() * num_neon_request_slots as usize; // For integrated_cache's Allocator. TODO: make this adjustable - size += IntegratedCacheInitStruct::shmem_size(max_procs); + size += IntegratedCacheInitStruct::shmem_size(); size as u64 } @@ -80,7 +73,7 @@ pub extern "C" fn rcommunicator_shmem_size(max_procs: u32) -> u64 { pub extern "C" fn rcommunicator_shmem_init( submission_pipe_read_fd: c_int, submission_pipe_write_fd: c_int, - max_procs: u32, + num_neon_request_slots: u32, shmem_area_ptr: *mut MaybeUninit, shmem_area_len: u64, initial_file_cache_size: u64, @@ -89,12 +82,8 @@ pub extern "C" fn rcommunicator_shmem_init( let shmem_area: &'static mut [MaybeUninit] = unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) }; - // Carve out the request slots from the shmem area and initialize them - let num_neon_request_slots_per_backend = NUM_NEON_REQUEST_SLOTS_PER_BACKEND as usize; - let num_neon_request_slots = max_procs as usize * num_neon_request_slots_per_backend; - let (neon_request_slots, remaining_area) = - alloc_array_from_slice::(shmem_area, num_neon_request_slots); + alloc_array_from_slice::(shmem_area, num_neon_request_slots as usize); for slot in neon_request_slots.iter_mut() { slot.write(NeonIOHandle::default()); @@ -110,7 +99,6 @@ pub extern "C" fn rcommunicator_shmem_init( // Give the rest of the area to the integrated cache let integrated_cache_init_struct = IntegratedCacheInitStruct::shmem_init( - max_procs, remaining_area, initial_file_cache_size, max_file_cache_size, @@ -125,11 +113,10 @@ pub extern "C" fn rcommunicator_shmem_init( }; let cis: &'static mut CommunicatorInitStruct = Box::leak(Box::new(CommunicatorInitStruct { - max_procs, submission_pipe_read_fd, submission_pipe_write_fd, - num_neon_request_slots_per_backend: NUM_NEON_REQUEST_SLOTS_PER_BACKEND, + num_neon_request_slots, neon_request_slots, integrated_cache_init_struct, diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 5c773fa58e..5f0ca5f510 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -78,7 +78,7 @@ pub struct IntegratedCacheReadAccess<'t> { impl<'t> IntegratedCacheInitStruct<'t> { /// Return the desired size in bytes of the fixed-size shared memory area to reserve for the /// integrated cache. - pub fn shmem_size(_max_procs: u32) -> usize { + pub fn shmem_size() -> usize { // The relsize cache is fixed-size. The block map is allocated in a separate resizable // area. HashMapInit::::estimate_size(RELSIZE_CACHE_SIZE) @@ -87,7 +87,6 @@ impl<'t> IntegratedCacheInitStruct<'t> { /// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which /// will be inherited by all processes through fork. pub fn shmem_init( - _max_procs: u32, shmem_area: &'t mut [MaybeUninit], initial_file_cache_size: u64, max_file_cache_size: u64, diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 226a55ac01..b809358c45 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -115,6 +115,10 @@ static CommunicatorShmemData *communicator_shmem_ptr; static int inflight_requests[MAX_INFLIGHT_ASYNC_REQUESTS]; static int num_inflight_requests = 0; +static int my_start_slot_idx; +static int my_end_slot_idx; +static int my_next_slot_idx; + static int start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p); static void wait_request_completion(int request_idx, struct NeonIOResult *result_p); static void perform_request(NeonIORequest *request, struct NeonIOResult *result_p); @@ -189,14 +193,17 @@ static size_t communicator_new_shmem_size(void) { size_t size = 0; + int num_request_slots; size += MAXALIGN( offsetof(CommunicatorShmemData, backends) + MaxProcs * sizeof(CommunicatorShmemPerBackendData) ); + num_request_slots = MaxProcs * MAX_INFLIGHT_ASYNC_REQUESTS; + /* space needed by the rust code */ - size += rcommunicator_shmem_size(MaxProcs); + size += rcommunicator_shmem_size(num_request_slots); return size; } @@ -256,7 +263,7 @@ communicator_new_shmem_startup(void) max_file_cache_size = 100; /* Initialize the rust-managed parts */ - cis = rcommunicator_shmem_init(pipefd[0], pipefd[1], MaxProcs, shmem_ptr, shmem_size, + cis = rcommunicator_shmem_init(pipefd[0], pipefd[1], MaxProcs * MAX_INFLIGHT_ASYNC_REQUESTS, shmem_ptr, shmem_size, initial_file_cache_size, max_file_cache_size); } @@ -442,6 +449,28 @@ communicator_new_init(void) my_bs = rcommunicator_backend_init(cis, MyProcNumber); cis = NULL; + /* + * Check the status of all the request slots. A previous backend with the + * same proc number might've left behind some prefetch requests or aborted + * requests + */ + my_start_slot_idx = MyProcNumber * MAX_INFLIGHT_ASYNC_REQUESTS; + my_end_slot_idx = my_start_slot_idx + MAX_INFLIGHT_ASYNC_REQUESTS; + my_next_slot_idx = my_start_slot_idx; + + for (int idx = my_start_slot_idx; idx < my_end_slot_idx; idx++) + { + struct NeonIOResult result; + + if (bcomm_get_request_slot_status(my_bs, idx)) + { + elog(LOG, "processing leftover IO request from previous session at slot %d", idx); + wait_request_completion(idx, &result); + + /* FIXME: log the result if it was an error */ + } + } + /* * Arrange to clean up at backend exit. */ @@ -572,13 +601,17 @@ start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p) Assert(num_inflight_requests < MAX_INFLIGHT_ASYNC_REQUESTS); - request_idx = bcomm_start_io_request(my_bs, request, immediate_result_p); + request_idx = bcomm_start_io_request(my_bs, my_next_slot_idx, request, immediate_result_p); if (request_idx == -1) { /* -1 means the request was satisfied immediately. */ elog(DEBUG4, "communicator request %lu was satisfied immediately", request->rel_exists.request_id); return -1; } + Assert(request_idx == my_next_slot_idx); + my_next_slot_idx++; + if (my_next_slot_idx == my_end_slot_idx) + my_next_slot_idx = my_start_slot_idx; inflight_requests[num_inflight_requests] = request_idx; num_inflight_requests++; @@ -749,7 +782,7 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe process_inflight_requests(); retry: - request_idx = bcomm_start_get_page_v_request(my_bs, &request, &cached_result); + request_idx = bcomm_start_get_page_v_request(my_bs, my_next_slot_idx, &request, &cached_result); if (request_idx == -1) { bool completed; @@ -801,8 +834,17 @@ retry: } return; } + Assert(request_idx == my_next_slot_idx); + my_next_slot_idx++; + if (my_next_slot_idx == my_end_slot_idx) + my_next_slot_idx = my_start_slot_idx; + inflight_requests[num_inflight_requests] = request_idx; + num_inflight_requests++; wait_request_completion(request_idx, &result); + Assert(num_inflight_requests == 1); + Assert(inflight_requests[0] == request_idx); + num_inflight_requests = 0; switch (result.tag) { case NeonIOResult_GetPageV: