Move logic for picking request slot to the C code

With this refactoring, the Rust code deals with one giant array of
requests, and doesn't know that it's sliced up per backend
process. The C code is now responsible for slicing it up.

This also adds code to complete old IOs at backends start that were
started and left behind by a previous session. That was a little more
straightforward to do with the refactoring, which is why I tackled it
now.
This commit is contained in:
Heikki Linnakangas
2025-07-07 12:59:08 +03:00
parent e1b58d5d69
commit a79fd3bda7
5 changed files with 97 additions and 58 deletions

View File

@@ -165,6 +165,10 @@ impl NeonIOHandle {
.store(NeonIOHandleState::Submitted, Ordering::Release); .store(NeonIOHandleState::Submitted, Ordering::Release);
} }
pub fn get_state(&self) -> NeonIOHandleState {
self.state.load(Ordering::Relaxed)
}
pub fn try_get_result(&self) -> Option<NeonIOResult> { pub fn try_get_result(&self) -> Option<NeonIOResult> {
// FIXME: ordering? // FIXME: ordering?
let state = self.state.load(Ordering::Relaxed); let state = self.state.load(Ordering::Relaxed);

View File

@@ -12,11 +12,6 @@ use crate::neon_request::{NeonIORequest, NeonIOResult};
pub struct CommunicatorBackendStruct<'t> { pub struct CommunicatorBackendStruct<'t> {
my_proc_number: i32, my_proc_number: i32,
next_request_slot_idx: u32,
my_start_idx: u32, // First request slot that belongs to this backend
my_end_idx: u32, // end + 1 request slot that belongs to this backend
neon_request_slots: &'t [NeonIOHandle], neon_request_slots: &'t [NeonIOHandle],
submission_pipe_write_fd: OwnedFd, submission_pipe_write_fd: OwnedFd,
@@ -31,24 +26,18 @@ pub extern "C" fn rcommunicator_backend_init(
cis: Box<CommunicatorInitStruct>, cis: Box<CommunicatorInitStruct>,
my_proc_number: i32, my_proc_number: i32,
) -> &'static mut CommunicatorBackendStruct<'static> { ) -> &'static mut CommunicatorBackendStruct<'static> {
if my_proc_number < 0 || my_proc_number as u32 >= cis.max_procs { if my_proc_number < 0 {
panic!( panic!(
"cannot attach to communicator shared memory with procnumber {} (max_procs {})", "cannot attach to communicator shared memory with procnumber {}",
my_proc_number, cis.max_procs, my_proc_number,
); );
} }
let start_idx = my_proc_number as u32 * cis.num_neon_request_slots_per_backend;
let end_idx = start_idx + cis.num_neon_request_slots_per_backend;
let integrated_cache = Box::leak(Box::new(cis.integrated_cache_init_struct.backend_init())); let integrated_cache = Box::leak(Box::new(cis.integrated_cache_init_struct.backend_init()));
let bs: &'static mut CommunicatorBackendStruct = let bs: &'static mut CommunicatorBackendStruct =
Box::leak(Box::new(CommunicatorBackendStruct { Box::leak(Box::new(CommunicatorBackendStruct {
my_proc_number, my_proc_number,
next_request_slot_idx: start_idx,
my_start_idx: start_idx,
my_end_idx: end_idx,
neon_request_slots: cis.neon_request_slots, neon_request_slots: cis.neon_request_slots,
submission_pipe_write_fd: cis.submission_pipe_write_fd, submission_pipe_write_fd: cis.submission_pipe_write_fd,
@@ -66,9 +55,11 @@ pub extern "C" fn rcommunicator_backend_init(
/// latch is set. /// latch is set.
/// ///
/// Safety: The C caller must ensure that the references are valid. /// Safety: The C caller must ensure that the references are valid.
/// The requested slot must be free, or this panics.
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub extern "C" fn bcomm_start_io_request( pub extern "C" fn bcomm_start_io_request(
bs: &'_ mut CommunicatorBackendStruct, bs: &'_ mut CommunicatorBackendStruct,
slot_idx: i32,
request: &NeonIORequest, request: &NeonIORequest,
immediate_result_ptr: &mut NeonIOResult, immediate_result_ptr: &mut NeonIOResult,
) -> i32 { ) -> i32 {
@@ -83,7 +74,7 @@ pub extern "C" fn bcomm_start_io_request(
} }
// Create neon request and submit it // Create neon request and submit it
let slot_idx = bs.start_neon_io_request(request); bs.start_neon_io_request(slot_idx, request);
// Tell the communicator about it // Tell the communicator about it
bs.submit_request(slot_idx); bs.submit_request(slot_idx);
@@ -94,6 +85,7 @@ pub extern "C" fn bcomm_start_io_request(
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub extern "C" fn bcomm_start_get_page_v_request( pub extern "C" fn bcomm_start_get_page_v_request(
bs: &mut CommunicatorBackendStruct, bs: &mut CommunicatorBackendStruct,
slot_idx: i32,
request: &NeonIORequest, request: &NeonIORequest,
immediate_result_ptr: &mut CCachedGetPageVResult, immediate_result_ptr: &mut CCachedGetPageVResult,
) -> i32 { ) -> i32 {
@@ -124,7 +116,7 @@ pub extern "C" fn bcomm_start_get_page_v_request(
} }
// Create neon request and submit it // Create neon request and submit it
let slot_idx = bs.start_neon_io_request(request); bs.start_neon_io_request(slot_idx, request);
// Tell the communicator about it // Tell the communicator about it
bs.submit_request(slot_idx); bs.submit_request(slot_idx);
@@ -151,6 +143,32 @@ pub extern "C" fn bcomm_poll_request_completion(
} }
} }
/// Check if a request has completed. Returns:
///
/// 'false' if the slot is Idle. The backend process has ownership.
/// 'true' if the slot is busy, and should be polled for result.
#[unsafe(no_mangle)]
pub extern "C" fn bcomm_get_request_slot_status(
bs: &mut CommunicatorBackendStruct,
request_slot_idx: u32,
) -> bool {
use crate::backend_comms::NeonIOHandleState;
match bs.neon_request_slots[request_slot_idx as usize].get_state() {
NeonIOHandleState::Idle => false,
NeonIOHandleState::Filling => {
// 'false' would be the right result here. However, this
// is a very transient state. The C code should never
// leave a slot in this state, so if it sees that,
// something's gone wrong and it's not clear what to do
// with it.
panic!("unexpected Filling state in request slot {}", request_slot_idx);
},
NeonIOHandleState::Submitted => true,
NeonIOHandleState::Processing => true,
NeonIOHandleState::Completed => true,
}
}
// LFC functions // LFC functions
/// Finish a local file cache read /// Finish a local file cache read
@@ -206,22 +224,11 @@ impl<'t> CommunicatorBackendStruct<'t> {
/// Note: there's no guarantee on when the communicator might pick it up. You should ring /// Note: there's no guarantee on when the communicator might pick it up. You should ring
/// the doorbell. But it might pick it up immediately. /// the doorbell. But it might pick it up immediately.
pub(crate) fn start_neon_io_request(&mut self, request: &NeonIORequest) -> i32 { ///
/// The slot must be free, or this panics.
pub(crate) fn start_neon_io_request(&mut self, request_slot_idx: i32, request: &NeonIORequest) {
let my_proc_number = self.my_proc_number; let my_proc_number = self.my_proc_number;
// Grab next free slot self.neon_request_slots[request_slot_idx as usize].fill_request(request, my_proc_number);
// FIXME: any guarantee that there will be any?
let idx = self.next_request_slot_idx;
let next_idx = idx + 1;
self.next_request_slot_idx = if next_idx == self.my_end_idx {
self.my_start_idx
} else {
next_idx
};
self.neon_request_slots[idx as usize].fill_request(request, my_proc_number);
idx as i32
} }
} }

View File

@@ -26,20 +26,15 @@ use std::os::fd::OwnedFd;
use crate::backend_comms::NeonIOHandle; use crate::backend_comms::NeonIOHandle;
use crate::integrated_cache::IntegratedCacheInitStruct; use crate::integrated_cache::IntegratedCacheInitStruct;
const NUM_NEON_REQUEST_SLOTS_PER_BACKEND: u32 = 5;
/// This struct is created in the postmaster process, and inherited to /// This struct is created in the postmaster process, and inherited to
/// the communicator process and all backend processes through fork() /// the communicator process and all backend processes through fork()
#[repr(C)] #[repr(C)]
pub struct CommunicatorInitStruct { pub struct CommunicatorInitStruct {
#[allow(dead_code)]
pub max_procs: u32,
pub submission_pipe_read_fd: OwnedFd, pub submission_pipe_read_fd: OwnedFd,
pub submission_pipe_write_fd: OwnedFd, pub submission_pipe_write_fd: OwnedFd,
// Shared memory data structures // Shared memory data structures
pub num_neon_request_slots_per_backend: u32, pub num_neon_request_slots: u32,
pub neon_request_slots: &'static [NeonIOHandle], pub neon_request_slots: &'static [NeonIOHandle],
@@ -49,12 +44,11 @@ pub struct CommunicatorInitStruct {
impl std::fmt::Debug for CommunicatorInitStruct { impl std::fmt::Debug for CommunicatorInitStruct {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
fmt.debug_struct("CommunicatorInitStruct") fmt.debug_struct("CommunicatorInitStruct")
.field("max_procs", &self.max_procs)
.field("submission_pipe_read_fd", &self.submission_pipe_read_fd) .field("submission_pipe_read_fd", &self.submission_pipe_read_fd)
.field("submission_pipe_write_fd", &self.submission_pipe_write_fd) .field("submission_pipe_write_fd", &self.submission_pipe_write_fd)
.field( .field(
"num_neon_request_slots_per_backend", "num_neon_request_slots",
&self.num_neon_request_slots_per_backend, &self.num_neon_request_slots,
) )
.field("neon_request_slots length", &self.neon_request_slots.len()) .field("neon_request_slots length", &self.neon_request_slots.len())
.finish() .finish()
@@ -62,14 +56,13 @@ impl std::fmt::Debug for CommunicatorInitStruct {
} }
#[unsafe(no_mangle)] #[unsafe(no_mangle)]
pub extern "C" fn rcommunicator_shmem_size(max_procs: u32) -> u64 { pub extern "C" fn rcommunicator_shmem_size(num_neon_request_slots: u32) -> u64 {
let mut size = 0; let mut size = 0;
let num_neon_request_slots = max_procs * NUM_NEON_REQUEST_SLOTS_PER_BACKEND;
size += mem::size_of::<NeonIOHandle>() * num_neon_request_slots as usize; size += mem::size_of::<NeonIOHandle>() * num_neon_request_slots as usize;
// For integrated_cache's Allocator. TODO: make this adjustable // For integrated_cache's Allocator. TODO: make this adjustable
size += IntegratedCacheInitStruct::shmem_size(max_procs); size += IntegratedCacheInitStruct::shmem_size();
size as u64 size as u64
} }
@@ -80,7 +73,7 @@ pub extern "C" fn rcommunicator_shmem_size(max_procs: u32) -> u64 {
pub extern "C" fn rcommunicator_shmem_init( pub extern "C" fn rcommunicator_shmem_init(
submission_pipe_read_fd: c_int, submission_pipe_read_fd: c_int,
submission_pipe_write_fd: c_int, submission_pipe_write_fd: c_int,
max_procs: u32, num_neon_request_slots: u32,
shmem_area_ptr: *mut MaybeUninit<u8>, shmem_area_ptr: *mut MaybeUninit<u8>,
shmem_area_len: u64, shmem_area_len: u64,
initial_file_cache_size: u64, initial_file_cache_size: u64,
@@ -89,12 +82,8 @@ pub extern "C" fn rcommunicator_shmem_init(
let shmem_area: &'static mut [MaybeUninit<u8>] = let shmem_area: &'static mut [MaybeUninit<u8>] =
unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) }; unsafe { std::slice::from_raw_parts_mut(shmem_area_ptr, shmem_area_len as usize) };
// Carve out the request slots from the shmem area and initialize them
let num_neon_request_slots_per_backend = NUM_NEON_REQUEST_SLOTS_PER_BACKEND as usize;
let num_neon_request_slots = max_procs as usize * num_neon_request_slots_per_backend;
let (neon_request_slots, remaining_area) = let (neon_request_slots, remaining_area) =
alloc_array_from_slice::<NeonIOHandle>(shmem_area, num_neon_request_slots); alloc_array_from_slice::<NeonIOHandle>(shmem_area, num_neon_request_slots as usize);
for slot in neon_request_slots.iter_mut() { for slot in neon_request_slots.iter_mut() {
slot.write(NeonIOHandle::default()); slot.write(NeonIOHandle::default());
@@ -110,7 +99,6 @@ pub extern "C" fn rcommunicator_shmem_init(
// Give the rest of the area to the integrated cache // Give the rest of the area to the integrated cache
let integrated_cache_init_struct = IntegratedCacheInitStruct::shmem_init( let integrated_cache_init_struct = IntegratedCacheInitStruct::shmem_init(
max_procs,
remaining_area, remaining_area,
initial_file_cache_size, initial_file_cache_size,
max_file_cache_size, max_file_cache_size,
@@ -125,11 +113,10 @@ pub extern "C" fn rcommunicator_shmem_init(
}; };
let cis: &'static mut CommunicatorInitStruct = Box::leak(Box::new(CommunicatorInitStruct { let cis: &'static mut CommunicatorInitStruct = Box::leak(Box::new(CommunicatorInitStruct {
max_procs,
submission_pipe_read_fd, submission_pipe_read_fd,
submission_pipe_write_fd, submission_pipe_write_fd,
num_neon_request_slots_per_backend: NUM_NEON_REQUEST_SLOTS_PER_BACKEND, num_neon_request_slots,
neon_request_slots, neon_request_slots,
integrated_cache_init_struct, integrated_cache_init_struct,

View File

@@ -78,7 +78,7 @@ pub struct IntegratedCacheReadAccess<'t> {
impl<'t> IntegratedCacheInitStruct<'t> { impl<'t> IntegratedCacheInitStruct<'t> {
/// Return the desired size in bytes of the fixed-size shared memory area to reserve for the /// Return the desired size in bytes of the fixed-size shared memory area to reserve for the
/// integrated cache. /// integrated cache.
pub fn shmem_size(_max_procs: u32) -> usize { pub fn shmem_size() -> usize {
// The relsize cache is fixed-size. The block map is allocated in a separate resizable // The relsize cache is fixed-size. The block map is allocated in a separate resizable
// area. // area.
HashMapInit::<RelKey, RelEntry>::estimate_size(RELSIZE_CACHE_SIZE) HashMapInit::<RelKey, RelEntry>::estimate_size(RELSIZE_CACHE_SIZE)
@@ -87,7 +87,6 @@ impl<'t> IntegratedCacheInitStruct<'t> {
/// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which /// Initialize the shared memory segment. This runs once in postmaster. Returns a struct which
/// will be inherited by all processes through fork. /// will be inherited by all processes through fork.
pub fn shmem_init( pub fn shmem_init(
_max_procs: u32,
shmem_area: &'t mut [MaybeUninit<u8>], shmem_area: &'t mut [MaybeUninit<u8>],
initial_file_cache_size: u64, initial_file_cache_size: u64,
max_file_cache_size: u64, max_file_cache_size: u64,

View File

@@ -115,6 +115,10 @@ static CommunicatorShmemData *communicator_shmem_ptr;
static int inflight_requests[MAX_INFLIGHT_ASYNC_REQUESTS]; static int inflight_requests[MAX_INFLIGHT_ASYNC_REQUESTS];
static int num_inflight_requests = 0; static int num_inflight_requests = 0;
static int my_start_slot_idx;
static int my_end_slot_idx;
static int my_next_slot_idx;
static int start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p); static int start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p);
static void wait_request_completion(int request_idx, struct NeonIOResult *result_p); static void wait_request_completion(int request_idx, struct NeonIOResult *result_p);
static void perform_request(NeonIORequest *request, struct NeonIOResult *result_p); static void perform_request(NeonIORequest *request, struct NeonIOResult *result_p);
@@ -189,14 +193,17 @@ static size_t
communicator_new_shmem_size(void) communicator_new_shmem_size(void)
{ {
size_t size = 0; size_t size = 0;
int num_request_slots;
size += MAXALIGN( size += MAXALIGN(
offsetof(CommunicatorShmemData, backends) + offsetof(CommunicatorShmemData, backends) +
MaxProcs * sizeof(CommunicatorShmemPerBackendData) MaxProcs * sizeof(CommunicatorShmemPerBackendData)
); );
num_request_slots = MaxProcs * MAX_INFLIGHT_ASYNC_REQUESTS;
/* space needed by the rust code */ /* space needed by the rust code */
size += rcommunicator_shmem_size(MaxProcs); size += rcommunicator_shmem_size(num_request_slots);
return size; return size;
} }
@@ -256,7 +263,7 @@ communicator_new_shmem_startup(void)
max_file_cache_size = 100; max_file_cache_size = 100;
/* Initialize the rust-managed parts */ /* Initialize the rust-managed parts */
cis = rcommunicator_shmem_init(pipefd[0], pipefd[1], MaxProcs, shmem_ptr, shmem_size, cis = rcommunicator_shmem_init(pipefd[0], pipefd[1], MaxProcs * MAX_INFLIGHT_ASYNC_REQUESTS, shmem_ptr, shmem_size,
initial_file_cache_size, max_file_cache_size); initial_file_cache_size, max_file_cache_size);
} }
@@ -442,6 +449,28 @@ communicator_new_init(void)
my_bs = rcommunicator_backend_init(cis, MyProcNumber); my_bs = rcommunicator_backend_init(cis, MyProcNumber);
cis = NULL; cis = NULL;
/*
* Check the status of all the request slots. A previous backend with the
* same proc number might've left behind some prefetch requests or aborted
* requests
*/
my_start_slot_idx = MyProcNumber * MAX_INFLIGHT_ASYNC_REQUESTS;
my_end_slot_idx = my_start_slot_idx + MAX_INFLIGHT_ASYNC_REQUESTS;
my_next_slot_idx = my_start_slot_idx;
for (int idx = my_start_slot_idx; idx < my_end_slot_idx; idx++)
{
struct NeonIOResult result;
if (bcomm_get_request_slot_status(my_bs, idx))
{
elog(LOG, "processing leftover IO request from previous session at slot %d", idx);
wait_request_completion(idx, &result);
/* FIXME: log the result if it was an error */
}
}
/* /*
* Arrange to clean up at backend exit. * Arrange to clean up at backend exit.
*/ */
@@ -572,13 +601,17 @@ start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p)
Assert(num_inflight_requests < MAX_INFLIGHT_ASYNC_REQUESTS); Assert(num_inflight_requests < MAX_INFLIGHT_ASYNC_REQUESTS);
request_idx = bcomm_start_io_request(my_bs, request, immediate_result_p); request_idx = bcomm_start_io_request(my_bs, my_next_slot_idx, request, immediate_result_p);
if (request_idx == -1) if (request_idx == -1)
{ {
/* -1 means the request was satisfied immediately. */ /* -1 means the request was satisfied immediately. */
elog(DEBUG4, "communicator request %lu was satisfied immediately", request->rel_exists.request_id); elog(DEBUG4, "communicator request %lu was satisfied immediately", request->rel_exists.request_id);
return -1; return -1;
} }
Assert(request_idx == my_next_slot_idx);
my_next_slot_idx++;
if (my_next_slot_idx == my_end_slot_idx)
my_next_slot_idx = my_start_slot_idx;
inflight_requests[num_inflight_requests] = request_idx; inflight_requests[num_inflight_requests] = request_idx;
num_inflight_requests++; num_inflight_requests++;
@@ -749,7 +782,7 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe
process_inflight_requests(); process_inflight_requests();
retry: retry:
request_idx = bcomm_start_get_page_v_request(my_bs, &request, &cached_result); request_idx = bcomm_start_get_page_v_request(my_bs, my_next_slot_idx, &request, &cached_result);
if (request_idx == -1) if (request_idx == -1)
{ {
bool completed; bool completed;
@@ -801,8 +834,17 @@ retry:
} }
return; return;
} }
Assert(request_idx == my_next_slot_idx);
my_next_slot_idx++;
if (my_next_slot_idx == my_end_slot_idx)
my_next_slot_idx = my_start_slot_idx;
inflight_requests[num_inflight_requests] = request_idx;
num_inflight_requests++;
wait_request_completion(request_idx, &result); wait_request_completion(request_idx, &result);
Assert(num_inflight_requests == 1);
Assert(inflight_requests[0] == request_idx);
num_inflight_requests = 0;
switch (result.tag) switch (result.tag)
{ {
case NeonIOResult_GetPageV: case NeonIOResult_GetPageV: