Rename NeonIOHandle to NeonIORequestSlot

All the code talks about "request slots", better to make the struct
name reflect that. The "Handle" term was borrowed from Postgres v18
AIO implementation, from the similar handles or slots used to submit
IO requests from backends to worker processes. But even though the
idea is similar, it's a completely separate implementation and there's
nothing else shared between them than the very high level
design.
This commit is contained in:
Heikki Linnakangas
2025-07-10 14:52:16 +03:00
parent b67e8f2edc
commit 47c099a0fb
4 changed files with 37 additions and 37 deletions

View File

@@ -51,9 +51,9 @@ use atomic_enum::atomic_enum;
/// has been submitted or when a response is ready. We only store the 'owner_procno'
/// which can be used for waking up the backend on completion, but the wakeups are
/// performed elsewhere.
pub struct NeonIOHandle {
pub struct NeonIORequestSlot {
/// similar to PgAioHandleState
state: AtomicNeonIOHandleState,
state: AtomicNeonIORequestSlotState,
/// The owning process's ProcNumber. The worker process uses this to set the process's
/// latch on completion.
@@ -77,23 +77,23 @@ pub struct NeonIOHandle {
// The protocol described in the "Lifecycle of a request" section above ensures
// the safe access to the fields
unsafe impl Send for NeonIOHandle {}
unsafe impl Sync for NeonIOHandle {}
unsafe impl Send for NeonIORequestSlot {}
unsafe impl Sync for NeonIORequestSlot {}
impl Default for NeonIOHandle {
fn default() -> NeonIOHandle {
NeonIOHandle {
impl Default for NeonIORequestSlot {
fn default() -> NeonIORequestSlot {
NeonIORequestSlot {
owner_procno: AtomicI32::new(-1),
request: UnsafeCell::new(NeonIORequest::Empty),
result: UnsafeCell::new(NeonIOResult::Empty),
state: AtomicNeonIOHandleState::new(NeonIOHandleState::Idle),
state: AtomicNeonIORequestSlotState::new(NeonIORequestSlotState::Idle),
}
}
}
#[atomic_enum]
#[derive(Eq, PartialEq)]
pub enum NeonIOHandleState {
pub enum NeonIORequestSlotState {
Idle,
/// backend is filling in the request
@@ -111,7 +111,7 @@ pub enum NeonIOHandleState {
Completed,
}
impl NeonIOHandle {
impl NeonIORequestSlot {
pub fn fill_request(&self, request: &NeonIORequest, proc_number: i32) {
// Verify that the slot is in Idle state previously, and start filling it.
//
@@ -119,8 +119,8 @@ impl NeonIOHandle {
// and try to use a slot that's already in use, we could fill the slot and
// switch it directly from Idle to Submitted state.
if let Err(s) = self.state.compare_exchange(
NeonIOHandleState::Idle,
NeonIOHandleState::Filling,
NeonIORequestSlotState::Idle,
NeonIORequestSlotState::Filling,
Ordering::Relaxed,
Ordering::Relaxed,
) {
@@ -133,21 +133,21 @@ impl NeonIOHandle {
self.owner_procno.store(proc_number, Ordering::Relaxed);
unsafe { *self.request.get() = *request }
self.state
.store(NeonIOHandleState::Submitted, Ordering::Release);
.store(NeonIORequestSlotState::Submitted, Ordering::Release);
}
pub fn get_state(&self) -> NeonIOHandleState {
pub fn get_state(&self) -> NeonIORequestSlotState {
self.state.load(Ordering::Relaxed)
}
pub fn try_get_result(&self) -> Option<NeonIOResult> {
// FIXME: ordering?
let state = self.state.load(Ordering::Relaxed);
if state == NeonIOHandleState::Completed {
if state == NeonIORequestSlotState::Completed {
// This fence synchronizes-with store/swap in `communicator_process_main_loop`.
fence(Ordering::Acquire);
let result = unsafe { *self.result.get() };
self.state.store(NeonIOHandleState::Idle, Ordering::Relaxed);
self.state.store(NeonIORequestSlotState::Idle, Ordering::Relaxed);
Some(result)
} else {
None
@@ -161,8 +161,8 @@ impl NeonIOHandle {
// already processing. That could be a flag somewhere in communicator's private
// memory, for example.
if let Err(s) = self.state.compare_exchange(
NeonIOHandleState::Submitted,
NeonIOHandleState::Processing,
NeonIORequestSlotState::Submitted,
NeonIORequestSlotState::Processing,
Ordering::Relaxed,
Ordering::Relaxed,
) {
@@ -177,7 +177,7 @@ impl NeonIOHandle {
}
}
pub struct RequestProcessingGuard<'a>(&'a NeonIOHandle);
pub struct RequestProcessingGuard<'a>(&'a NeonIORequestSlot);
unsafe impl<'a> Send for RequestProcessingGuard<'a> {}
unsafe impl<'a> Sync for RequestProcessingGuard<'a> {}
@@ -201,7 +201,7 @@ impl<'a> RequestProcessingGuard<'a> {
let old_state = self
.0
.state
.swap(NeonIOHandleState::Completed, Ordering::Release);
assert!(old_state == NeonIOHandleState::Processing);
.swap(NeonIORequestSlotState::Completed, Ordering::Release);
assert!(old_state == NeonIORequestSlotState::Processing);
}
}

View File

@@ -3,7 +3,7 @@
use std::os::fd::OwnedFd;
use crate::backend_comms::NeonIOHandle;
use crate::backend_comms::NeonIORequestSlot;
use crate::init::CommunicatorInitStruct;
use crate::integrated_cache::{BackendCacheReadOp, IntegratedCacheReadAccess};
use crate::neon_request::{CCachedGetPageVResult, COid};
@@ -12,7 +12,7 @@ use crate::neon_request::{NeonIORequest, NeonIOResult};
pub struct CommunicatorBackendStruct<'t> {
my_proc_number: i32,
neon_request_slots: &'t [NeonIOHandle],
neon_request_slots: &'t [NeonIORequestSlot],
submission_pipe_write_fd: OwnedFd,
@@ -152,10 +152,10 @@ pub extern "C" fn bcomm_get_request_slot_status(
bs: &mut CommunicatorBackendStruct,
request_slot_idx: u32,
) -> bool {
use crate::backend_comms::NeonIOHandleState;
use crate::backend_comms::NeonIORequestSlotState;
match bs.neon_request_slots[request_slot_idx as usize].get_state() {
NeonIOHandleState::Idle => false,
NeonIOHandleState::Filling => {
NeonIORequestSlotState::Idle => false,
NeonIORequestSlotState::Filling => {
// 'false' would be the right result here. However, this
// is a very transient state. The C code should never
// leave a slot in this state, so if it sees that,
@@ -166,9 +166,9 @@ pub extern "C" fn bcomm_get_request_slot_status(
request_slot_idx
);
}
NeonIOHandleState::Submitted => true,
NeonIOHandleState::Processing => true,
NeonIOHandleState::Completed => true,
NeonIORequestSlotState::Submitted => true,
NeonIORequestSlotState::Processing => true,
NeonIORequestSlotState::Completed => true,
}
}

View File

@@ -23,7 +23,7 @@ use std::mem;
use std::mem::MaybeUninit;
use std::os::fd::OwnedFd;
use crate::backend_comms::NeonIOHandle;
use crate::backend_comms::NeonIORequestSlot;
use crate::integrated_cache::IntegratedCacheInitStruct;
/// This struct is created in the postmaster process, and inherited to
@@ -36,7 +36,7 @@ pub struct CommunicatorInitStruct {
// Shared memory data structures
pub num_neon_request_slots: u32,
pub neon_request_slots: &'static [NeonIOHandle],
pub neon_request_slots: &'static [NeonIORequestSlot],
pub integrated_cache_init_struct: IntegratedCacheInitStruct<'static>,
}
@@ -56,7 +56,7 @@ impl std::fmt::Debug for CommunicatorInitStruct {
pub extern "C" fn rcommunicator_shmem_size(num_neon_request_slots: u32) -> u64 {
let mut size = 0;
size += mem::size_of::<NeonIOHandle>() * num_neon_request_slots as usize;
size += mem::size_of::<NeonIORequestSlot>() * num_neon_request_slots as usize;
// For integrated_cache's Allocator. TODO: make this adjustable
size += IntegratedCacheInitStruct::shmem_size();
@@ -80,16 +80,16 @@ pub extern "C" fn rcommunicator_shmem_init(
unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) };
let (neon_request_slots, remaining_area) =
alloc_array_from_slice::<NeonIOHandle>(shmem_area, num_neon_request_slots as usize);
alloc_array_from_slice::<NeonIORequestSlot>(shmem_area, num_neon_request_slots as usize);
for slot in neon_request_slots.iter_mut() {
slot.write(NeonIOHandle::default());
slot.write(NeonIORequestSlot::default());
}
// 'neon_request_slots' is initialized now. (MaybeUninit::slice_assume_init_mut() is nightly-only
// as of this writing.)
let neon_request_slots = unsafe {
std::mem::transmute::<&mut [MaybeUninit<NeonIOHandle>], &mut [NeonIOHandle]>(
std::mem::transmute::<&mut [MaybeUninit<NeonIORequestSlot>], &mut [NeonIORequestSlot]>(
neon_request_slots,
)
};

View File

@@ -4,7 +4,7 @@ use std::os::fd::OwnedFd;
use std::path::PathBuf;
use std::str::FromStr as _;
use crate::backend_comms::NeonIOHandle;
use crate::backend_comms::NeonIORequestSlot;
use crate::file_cache::FileCache;
use crate::global_allocator::MyAllocatorCollector;
use crate::init::CommunicatorInitStruct;
@@ -36,7 +36,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
client: PageserverClient,
/// Request slots that backends use to send IO requests to the communicator.
neon_request_slots: &'a [NeonIOHandle],
neon_request_slots: &'a [NeonIORequestSlot],
/// Notification pipe. Backends use this to notify the communicator that a request is waiting to
/// be processed in one of the request slots.