From 47c099a0fb752125b12abd1efd3737607ffd6ad0 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 10 Jul 2025 14:52:16 +0300 Subject: [PATCH] Rename NeonIOHandle to NeonIORequestSlot All the code talks about "request slots", better to make the struct name reflect that. The "Handle" term was borrowed from Postgres v18 AIO implementation, from the similar handles or slots used to submit IO requests from backends to worker processes. But even though the idea is similar, it's a completely separate implementation and there's nothing else shared between them than the very high level design. --- pgxn/neon/communicator/src/backend_comms.rs | 42 +++++++++---------- .../communicator/src/backend_interface.rs | 16 +++---- pgxn/neon/communicator/src/init.rs | 12 +++--- .../src/worker_process/main_loop.rs | 4 +- 4 files changed, 37 insertions(+), 37 deletions(-) diff --git a/pgxn/neon/communicator/src/backend_comms.rs b/pgxn/neon/communicator/src/backend_comms.rs index 0423b4486e..5851bd0d7a 100644 --- a/pgxn/neon/communicator/src/backend_comms.rs +++ b/pgxn/neon/communicator/src/backend_comms.rs @@ -51,9 +51,9 @@ use atomic_enum::atomic_enum; /// has been submitted or when a response is ready. We only store the 'owner_procno' /// which can be used for waking up the backend on completion, but the wakeups are /// performed elsewhere. -pub struct NeonIOHandle { +pub struct NeonIORequestSlot { /// similar to PgAioHandleState - state: AtomicNeonIOHandleState, + state: AtomicNeonIORequestSlotState, /// The owning process's ProcNumber. The worker process uses this to set the process's /// latch on completion. @@ -77,23 +77,23 @@ pub struct NeonIOHandle { // The protocol described in the "Lifecycle of a request" section above ensures // the safe access to the fields -unsafe impl Send for NeonIOHandle {} -unsafe impl Sync for NeonIOHandle {} +unsafe impl Send for NeonIORequestSlot {} +unsafe impl Sync for NeonIORequestSlot {} -impl Default for NeonIOHandle { - fn default() -> NeonIOHandle { - NeonIOHandle { +impl Default for NeonIORequestSlot { + fn default() -> NeonIORequestSlot { + NeonIORequestSlot { owner_procno: AtomicI32::new(-1), request: UnsafeCell::new(NeonIORequest::Empty), result: UnsafeCell::new(NeonIOResult::Empty), - state: AtomicNeonIOHandleState::new(NeonIOHandleState::Idle), + state: AtomicNeonIORequestSlotState::new(NeonIORequestSlotState::Idle), } } } #[atomic_enum] #[derive(Eq, PartialEq)] -pub enum NeonIOHandleState { +pub enum NeonIORequestSlotState { Idle, /// backend is filling in the request @@ -111,7 +111,7 @@ pub enum NeonIOHandleState { Completed, } -impl NeonIOHandle { +impl NeonIORequestSlot { pub fn fill_request(&self, request: &NeonIORequest, proc_number: i32) { // Verify that the slot is in Idle state previously, and start filling it. // @@ -119,8 +119,8 @@ impl NeonIOHandle { // and try to use a slot that's already in use, we could fill the slot and // switch it directly from Idle to Submitted state. if let Err(s) = self.state.compare_exchange( - NeonIOHandleState::Idle, - NeonIOHandleState::Filling, + NeonIORequestSlotState::Idle, + NeonIORequestSlotState::Filling, Ordering::Relaxed, Ordering::Relaxed, ) { @@ -133,21 +133,21 @@ impl NeonIOHandle { self.owner_procno.store(proc_number, Ordering::Relaxed); unsafe { *self.request.get() = *request } self.state - .store(NeonIOHandleState::Submitted, Ordering::Release); + .store(NeonIORequestSlotState::Submitted, Ordering::Release); } - pub fn get_state(&self) -> NeonIOHandleState { + pub fn get_state(&self) -> NeonIORequestSlotState { self.state.load(Ordering::Relaxed) } pub fn try_get_result(&self) -> Option { // FIXME: ordering? let state = self.state.load(Ordering::Relaxed); - if state == NeonIOHandleState::Completed { + if state == NeonIORequestSlotState::Completed { // This fence synchronizes-with store/swap in `communicator_process_main_loop`. fence(Ordering::Acquire); let result = unsafe { *self.result.get() }; - self.state.store(NeonIOHandleState::Idle, Ordering::Relaxed); + self.state.store(NeonIORequestSlotState::Idle, Ordering::Relaxed); Some(result) } else { None @@ -161,8 +161,8 @@ impl NeonIOHandle { // already processing. That could be a flag somewhere in communicator's private // memory, for example. if let Err(s) = self.state.compare_exchange( - NeonIOHandleState::Submitted, - NeonIOHandleState::Processing, + NeonIORequestSlotState::Submitted, + NeonIORequestSlotState::Processing, Ordering::Relaxed, Ordering::Relaxed, ) { @@ -177,7 +177,7 @@ impl NeonIOHandle { } } -pub struct RequestProcessingGuard<'a>(&'a NeonIOHandle); +pub struct RequestProcessingGuard<'a>(&'a NeonIORequestSlot); unsafe impl<'a> Send for RequestProcessingGuard<'a> {} unsafe impl<'a> Sync for RequestProcessingGuard<'a> {} @@ -201,7 +201,7 @@ impl<'a> RequestProcessingGuard<'a> { let old_state = self .0 .state - .swap(NeonIOHandleState::Completed, Ordering::Release); - assert!(old_state == NeonIOHandleState::Processing); + .swap(NeonIORequestSlotState::Completed, Ordering::Release); + assert!(old_state == NeonIORequestSlotState::Processing); } } diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index fd0081e837..91ecf3f4c4 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -3,7 +3,7 @@ use std::os::fd::OwnedFd; -use crate::backend_comms::NeonIOHandle; +use crate::backend_comms::NeonIORequestSlot; use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{BackendCacheReadOp, IntegratedCacheReadAccess}; use crate::neon_request::{CCachedGetPageVResult, COid}; @@ -12,7 +12,7 @@ use crate::neon_request::{NeonIORequest, NeonIOResult}; pub struct CommunicatorBackendStruct<'t> { my_proc_number: i32, - neon_request_slots: &'t [NeonIOHandle], + neon_request_slots: &'t [NeonIORequestSlot], submission_pipe_write_fd: OwnedFd, @@ -152,10 +152,10 @@ pub extern "C" fn bcomm_get_request_slot_status( bs: &mut CommunicatorBackendStruct, request_slot_idx: u32, ) -> bool { - use crate::backend_comms::NeonIOHandleState; + use crate::backend_comms::NeonIORequestSlotState; match bs.neon_request_slots[request_slot_idx as usize].get_state() { - NeonIOHandleState::Idle => false, - NeonIOHandleState::Filling => { + NeonIORequestSlotState::Idle => false, + NeonIORequestSlotState::Filling => { // 'false' would be the right result here. However, this // is a very transient state. The C code should never // leave a slot in this state, so if it sees that, @@ -166,9 +166,9 @@ pub extern "C" fn bcomm_get_request_slot_status( request_slot_idx ); } - NeonIOHandleState::Submitted => true, - NeonIOHandleState::Processing => true, - NeonIOHandleState::Completed => true, + NeonIORequestSlotState::Submitted => true, + NeonIORequestSlotState::Processing => true, + NeonIORequestSlotState::Completed => true, } } diff --git a/pgxn/neon/communicator/src/init.rs b/pgxn/neon/communicator/src/init.rs index 20bb4923e8..f5af93cc97 100644 --- a/pgxn/neon/communicator/src/init.rs +++ b/pgxn/neon/communicator/src/init.rs @@ -23,7 +23,7 @@ use std::mem; use std::mem::MaybeUninit; use std::os::fd::OwnedFd; -use crate::backend_comms::NeonIOHandle; +use crate::backend_comms::NeonIORequestSlot; use crate::integrated_cache::IntegratedCacheInitStruct; /// This struct is created in the postmaster process, and inherited to @@ -36,7 +36,7 @@ pub struct CommunicatorInitStruct { // Shared memory data structures pub num_neon_request_slots: u32, - pub neon_request_slots: &'static [NeonIOHandle], + pub neon_request_slots: &'static [NeonIORequestSlot], pub integrated_cache_init_struct: IntegratedCacheInitStruct<'static>, } @@ -56,7 +56,7 @@ impl std::fmt::Debug for CommunicatorInitStruct { pub extern "C" fn rcommunicator_shmem_size(num_neon_request_slots: u32) -> u64 { let mut size = 0; - size += mem::size_of::() * num_neon_request_slots as usize; + size += mem::size_of::() * num_neon_request_slots as usize; // For integrated_cache's Allocator. TODO: make this adjustable size += IntegratedCacheInitStruct::shmem_size(); @@ -80,16 +80,16 @@ pub extern "C" fn rcommunicator_shmem_init( unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) }; let (neon_request_slots, remaining_area) = - alloc_array_from_slice::(shmem_area, num_neon_request_slots as usize); + alloc_array_from_slice::(shmem_area, num_neon_request_slots as usize); for slot in neon_request_slots.iter_mut() { - slot.write(NeonIOHandle::default()); + slot.write(NeonIORequestSlot::default()); } // 'neon_request_slots' is initialized now. (MaybeUninit::slice_assume_init_mut() is nightly-only // as of this writing.) let neon_request_slots = unsafe { - std::mem::transmute::<&mut [MaybeUninit], &mut [NeonIOHandle]>( + std::mem::transmute::<&mut [MaybeUninit], &mut [NeonIORequestSlot]>( neon_request_slots, ) }; diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 3ae187ac16..43145f7f22 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -4,7 +4,7 @@ use std::os::fd::OwnedFd; use std::path::PathBuf; use std::str::FromStr as _; -use crate::backend_comms::NeonIOHandle; +use crate::backend_comms::NeonIORequestSlot; use crate::file_cache::FileCache; use crate::global_allocator::MyAllocatorCollector; use crate::init::CommunicatorInitStruct; @@ -36,7 +36,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> { client: PageserverClient, /// Request slots that backends use to send IO requests to the communicator. - neon_request_slots: &'a [NeonIOHandle], + neon_request_slots: &'a [NeonIORequestSlot], /// Notification pipe. Backends use this to notify the communicator that a request is waiting to /// be processed in one of the request slots.