diff --git a/pgxn/neon/communicator/src/backend_comms.rs b/pgxn/neon/communicator/src/backend_comms.rs index 0423b4486e..5851bd0d7a 100644 --- a/pgxn/neon/communicator/src/backend_comms.rs +++ b/pgxn/neon/communicator/src/backend_comms.rs @@ -51,9 +51,9 @@ use atomic_enum::atomic_enum; /// has been submitted or when a response is ready. We only store the 'owner_procno' /// which can be used for waking up the backend on completion, but the wakeups are /// performed elsewhere. -pub struct NeonIOHandle { +pub struct NeonIORequestSlot { /// similar to PgAioHandleState - state: AtomicNeonIOHandleState, + state: AtomicNeonIORequestSlotState, /// The owning process's ProcNumber. The worker process uses this to set the process's /// latch on completion. @@ -77,23 +77,23 @@ pub struct NeonIOHandle { // The protocol described in the "Lifecycle of a request" section above ensures // the safe access to the fields -unsafe impl Send for NeonIOHandle {} -unsafe impl Sync for NeonIOHandle {} +unsafe impl Send for NeonIORequestSlot {} +unsafe impl Sync for NeonIORequestSlot {} -impl Default for NeonIOHandle { - fn default() -> NeonIOHandle { - NeonIOHandle { +impl Default for NeonIORequestSlot { + fn default() -> NeonIORequestSlot { + NeonIORequestSlot { owner_procno: AtomicI32::new(-1), request: UnsafeCell::new(NeonIORequest::Empty), result: UnsafeCell::new(NeonIOResult::Empty), - state: AtomicNeonIOHandleState::new(NeonIOHandleState::Idle), + state: AtomicNeonIORequestSlotState::new(NeonIORequestSlotState::Idle), } } } #[atomic_enum] #[derive(Eq, PartialEq)] -pub enum NeonIOHandleState { +pub enum NeonIORequestSlotState { Idle, /// backend is filling in the request @@ -111,7 +111,7 @@ pub enum NeonIOHandleState { Completed, } -impl NeonIOHandle { +impl NeonIORequestSlot { pub fn fill_request(&self, request: &NeonIORequest, proc_number: i32) { // Verify that the slot is in Idle state previously, and start filling it. // @@ -119,8 +119,8 @@ impl NeonIOHandle { // and try to use a slot that's already in use, we could fill the slot and // switch it directly from Idle to Submitted state. if let Err(s) = self.state.compare_exchange( - NeonIOHandleState::Idle, - NeonIOHandleState::Filling, + NeonIORequestSlotState::Idle, + NeonIORequestSlotState::Filling, Ordering::Relaxed, Ordering::Relaxed, ) { @@ -133,21 +133,21 @@ impl NeonIOHandle { self.owner_procno.store(proc_number, Ordering::Relaxed); unsafe { *self.request.get() = *request } self.state - .store(NeonIOHandleState::Submitted, Ordering::Release); + .store(NeonIORequestSlotState::Submitted, Ordering::Release); } - pub fn get_state(&self) -> NeonIOHandleState { + pub fn get_state(&self) -> NeonIORequestSlotState { self.state.load(Ordering::Relaxed) } pub fn try_get_result(&self) -> Option { // FIXME: ordering? let state = self.state.load(Ordering::Relaxed); - if state == NeonIOHandleState::Completed { + if state == NeonIORequestSlotState::Completed { // This fence synchronizes-with store/swap in `communicator_process_main_loop`. fence(Ordering::Acquire); let result = unsafe { *self.result.get() }; - self.state.store(NeonIOHandleState::Idle, Ordering::Relaxed); + self.state.store(NeonIORequestSlotState::Idle, Ordering::Relaxed); Some(result) } else { None @@ -161,8 +161,8 @@ impl NeonIOHandle { // already processing. That could be a flag somewhere in communicator's private // memory, for example. if let Err(s) = self.state.compare_exchange( - NeonIOHandleState::Submitted, - NeonIOHandleState::Processing, + NeonIORequestSlotState::Submitted, + NeonIORequestSlotState::Processing, Ordering::Relaxed, Ordering::Relaxed, ) { @@ -177,7 +177,7 @@ impl NeonIOHandle { } } -pub struct RequestProcessingGuard<'a>(&'a NeonIOHandle); +pub struct RequestProcessingGuard<'a>(&'a NeonIORequestSlot); unsafe impl<'a> Send for RequestProcessingGuard<'a> {} unsafe impl<'a> Sync for RequestProcessingGuard<'a> {} @@ -201,7 +201,7 @@ impl<'a> RequestProcessingGuard<'a> { let old_state = self .0 .state - .swap(NeonIOHandleState::Completed, Ordering::Release); - assert!(old_state == NeonIOHandleState::Processing); + .swap(NeonIORequestSlotState::Completed, Ordering::Release); + assert!(old_state == NeonIORequestSlotState::Processing); } } diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index fd0081e837..91ecf3f4c4 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -3,7 +3,7 @@ use std::os::fd::OwnedFd; -use crate::backend_comms::NeonIOHandle; +use crate::backend_comms::NeonIORequestSlot; use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{BackendCacheReadOp, IntegratedCacheReadAccess}; use crate::neon_request::{CCachedGetPageVResult, COid}; @@ -12,7 +12,7 @@ use crate::neon_request::{NeonIORequest, NeonIOResult}; pub struct CommunicatorBackendStruct<'t> { my_proc_number: i32, - neon_request_slots: &'t [NeonIOHandle], + neon_request_slots: &'t [NeonIORequestSlot], submission_pipe_write_fd: OwnedFd, @@ -152,10 +152,10 @@ pub extern "C" fn bcomm_get_request_slot_status( bs: &mut CommunicatorBackendStruct, request_slot_idx: u32, ) -> bool { - use crate::backend_comms::NeonIOHandleState; + use crate::backend_comms::NeonIORequestSlotState; match bs.neon_request_slots[request_slot_idx as usize].get_state() { - NeonIOHandleState::Idle => false, - NeonIOHandleState::Filling => { + NeonIORequestSlotState::Idle => false, + NeonIORequestSlotState::Filling => { // 'false' would be the right result here. However, this // is a very transient state. The C code should never // leave a slot in this state, so if it sees that, @@ -166,9 +166,9 @@ pub extern "C" fn bcomm_get_request_slot_status( request_slot_idx ); } - NeonIOHandleState::Submitted => true, - NeonIOHandleState::Processing => true, - NeonIOHandleState::Completed => true, + NeonIORequestSlotState::Submitted => true, + NeonIORequestSlotState::Processing => true, + NeonIORequestSlotState::Completed => true, } } diff --git a/pgxn/neon/communicator/src/init.rs b/pgxn/neon/communicator/src/init.rs index 20bb4923e8..f5af93cc97 100644 --- a/pgxn/neon/communicator/src/init.rs +++ b/pgxn/neon/communicator/src/init.rs @@ -23,7 +23,7 @@ use std::mem; use std::mem::MaybeUninit; use std::os::fd::OwnedFd; -use crate::backend_comms::NeonIOHandle; +use crate::backend_comms::NeonIORequestSlot; use crate::integrated_cache::IntegratedCacheInitStruct; /// This struct is created in the postmaster process, and inherited to @@ -36,7 +36,7 @@ pub struct CommunicatorInitStruct { // Shared memory data structures pub num_neon_request_slots: u32, - pub neon_request_slots: &'static [NeonIOHandle], + pub neon_request_slots: &'static [NeonIORequestSlot], pub integrated_cache_init_struct: IntegratedCacheInitStruct<'static>, } @@ -56,7 +56,7 @@ impl std::fmt::Debug for CommunicatorInitStruct { pub extern "C" fn rcommunicator_shmem_size(num_neon_request_slots: u32) -> u64 { let mut size = 0; - size += mem::size_of::() * num_neon_request_slots as usize; + size += mem::size_of::() * num_neon_request_slots as usize; // For integrated_cache's Allocator. TODO: make this adjustable size += IntegratedCacheInitStruct::shmem_size(); @@ -80,16 +80,16 @@ pub extern "C" fn rcommunicator_shmem_init( unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) }; let (neon_request_slots, remaining_area) = - alloc_array_from_slice::(shmem_area, num_neon_request_slots as usize); + alloc_array_from_slice::(shmem_area, num_neon_request_slots as usize); for slot in neon_request_slots.iter_mut() { - slot.write(NeonIOHandle::default()); + slot.write(NeonIORequestSlot::default()); } // 'neon_request_slots' is initialized now. (MaybeUninit::slice_assume_init_mut() is nightly-only // as of this writing.) let neon_request_slots = unsafe { - std::mem::transmute::<&mut [MaybeUninit], &mut [NeonIOHandle]>( + std::mem::transmute::<&mut [MaybeUninit], &mut [NeonIORequestSlot]>( neon_request_slots, ) }; diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 3ae187ac16..43145f7f22 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -4,7 +4,7 @@ use std::os::fd::OwnedFd; use std::path::PathBuf; use std::str::FromStr as _; -use crate::backend_comms::NeonIOHandle; +use crate::backend_comms::NeonIORequestSlot; use crate::file_cache::FileCache; use crate::global_allocator::MyAllocatorCollector; use crate::init::CommunicatorInitStruct; @@ -36,7 +36,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> { client: PageserverClient, /// Request slots that backends use to send IO requests to the communicator. - neon_request_slots: &'a [NeonIOHandle], + neon_request_slots: &'a [NeonIORequestSlot], /// Notification pipe. Backends use this to notify the communicator that a request is waiting to /// be processed in one of the request slots.