Minor cleanup

Tidy up and add some comments. Rename a few things for clarity.
This commit is contained in:
Heikki Linnakangas
2025-07-04 20:26:21 +03:00
parent 6c398aeae7
commit 90d3c09c24
5 changed files with 48 additions and 38 deletions

View File

@@ -179,9 +179,8 @@ impl NeonIOHandle {
}
}
/// Read the IO request from the slot indicated in the wakeup
pub fn start_processing_request<'a>(&'a self) -> Option<RequestProcessingGuard<'a>> {
// Read the IO request from the slot indicated in the wakeup
//
// XXX: using compare_exchange for this is not strictly necessary, as long as
// the communicator process has _some_ means of tracking which requests it's
// already processing. That could be a flag somewhere in communicator's private

View File

@@ -12,7 +12,7 @@ use crate::neon_request::{NeonIORequest, NeonIOResult};
pub struct CommunicatorBackendStruct<'t> {
my_proc_number: i32,
next_neon_request_idx: u32,
next_request_slot_idx: u32,
my_start_idx: u32, // First request slot that belongs to this backend
my_end_idx: u32, // end + 1 request slot that belongs to this backend
@@ -46,7 +46,7 @@ pub extern "C" fn rcommunicator_backend_init(
let bs: &'static mut CommunicatorBackendStruct =
Box::leak(Box::new(CommunicatorBackendStruct {
my_proc_number,
next_neon_request_idx: start_idx,
next_request_slot_idx: start_idx,
my_start_idx: start_idx,
my_end_idx: end_idx,
neon_request_slots: cis.neon_request_slots,
@@ -83,12 +83,12 @@ pub extern "C" fn bcomm_start_io_request(
}
// Create neon request and submit it
let request_idx = bs.start_neon_request(request);
let slot_idx = bs.start_neon_io_request(request);
// Tell the communicator about it
bs.submit_request(request_idx);
bs.submit_request(slot_idx);
request_idx
slot_idx
}
#[unsafe(no_mangle)]
@@ -124,12 +124,12 @@ pub extern "C" fn bcomm_start_get_page_v_request(
}
// Create neon request and submit it
let request_idx = bs.start_neon_request(request);
let slot_idx = bs.start_neon_io_request(request);
// Tell the communicator about it
bs.submit_request(request_idx);
bs.submit_request(slot_idx);
request_idx
slot_idx
}
/// Check if a request has completed. Returns:
@@ -139,10 +139,10 @@ pub extern "C" fn bcomm_start_get_page_v_request(
#[unsafe(no_mangle)]
pub extern "C" fn bcomm_poll_request_completion(
bs: &mut CommunicatorBackendStruct,
request_idx: u32,
request_slot_idx: u32,
result_p: &mut NeonIOResult,
) -> i32 {
match bs.neon_request_slots[request_idx as usize].try_get_result() {
match bs.neon_request_slots[request_slot_idx as usize].try_get_result() {
None => -1, // still processing
Some(result) => {
*result_p = result;
@@ -188,7 +188,7 @@ pub extern "C" fn bcomm_cache_contains(
impl<'t> CommunicatorBackendStruct<'t> {
/// Send a wakeup to the communicator process
fn submit_request(self: &CommunicatorBackendStruct<'t>, request_idx: i32) {
fn submit_request(self: &CommunicatorBackendStruct<'t>, request_slot_idx: i32) {
// wake up communicator by writing the idx to the submission pipe
//
// This can block, if the pipe is full. That should be very rare,
@@ -198,7 +198,7 @@ impl<'t> CommunicatorBackendStruct<'t> {
// backend.
//
// If it does block very briefly, that's not too serious.
let idxbuf = request_idx.to_ne_bytes();
let idxbuf = request_slot_idx.to_ne_bytes();
let _res = nix::unistd::write(&self.submission_pipe_write_fd, &idxbuf);
// FIXME: check result, return any errors
@@ -206,15 +206,15 @@ impl<'t> CommunicatorBackendStruct<'t> {
/// Note: there's no guarantee on when the communicator might pick it up. You should ring
/// the doorbell. But it might pick it up immediately.
pub(crate) fn start_neon_request(&mut self, request: &NeonIORequest) -> i32 {
pub(crate) fn start_neon_io_request(&mut self, request: &NeonIORequest) -> i32 {
let my_proc_number = self.my_proc_number;
// Grab next free slot
// FIXME: any guarantee that there will be any?
let idx = self.next_neon_request_idx;
let idx = self.next_request_slot_idx;
let next_idx = idx + 1;
self.next_neon_request_idx = if next_idx == self.my_end_idx {
self.next_request_slot_idx = if next_idx == self.my_end_idx {
self.my_start_idx
} else {
next_idx

View File

@@ -626,6 +626,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
None
}
/// Resize the local file cache.
pub fn resize_file_cache(&self, num_blocks: u32) {
let old_num_blocks = self.block_map.get_num_buckets() as u32;
@@ -638,6 +639,8 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
err
);
}
} else {
// TODO: Shrinking not implemented yet
}
}

View File

@@ -1,7 +1,7 @@
//! Glue code to hook up Rust logging, with the `tracing` crate, to the PostgreSQL log
//! Glue code to hook up Rust logging with the `tracing` crate to the PostgreSQL log
//!
//! In the Rust threads, the log messages are written to a mpsc Channel, and the Postgres
//! process latch is raised. That wakes up the loop in the main thread. It reads the
//! process latch is raised. That wakes up the loop in the main thread. It reads the
//! message from the channel and ereport()s it. This ensures that only one thread, the main
//! thread, calls the PostgreSQL logging routines at any time.

View File

@@ -189,34 +189,29 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
loop {
// Wait for a backend to ring the doorbell
match submission_pipe_read.read(&mut idxbuf).await {
Ok(4) => {}
Ok(nbytes) => panic!("short read ({nbytes} bytes) on communicator pipe"),
Err(e) => panic!("error reading from communicator pipe: {e}"),
}
let request_idx = u32::from_ne_bytes(idxbuf);
let slot_idx = u32::from_ne_bytes(idxbuf) as usize;
// Read the IO request from the slot indicated in the wakeup
let Some(slot) =
self.neon_request_slots[request_idx as usize].start_processing_request()
else {
// This currently should not happen. But if we have multiple threads picking up
let Some(slot) = self.neon_request_slots[slot_idx].start_processing_request() else {
// This currently should not happen. But if we had multiple threads picking up
// requests, and without waiting for the notifications, it could.
panic!("no request in slot");
};
// Ok, we have ownership of this request now. We must process
// it now, there's no going back.
//trace!("processing request {request_idx}: {request:?}");
// Ok, we have ownership of this request now. We must process it now, there's no going
// back.
//
// Spawn a separate task for every request. That's a little excessive for requests that
// can be quickly satisfied from the cache, but we expect that to be rare, because the
// requesting backend would have already checked the cache.
tokio::spawn(async move {
let request_id = slot.get_request().request_id();
trace!("spawned task to process request {request_id} at slot {request_idx}");
trace!("spawned task to process request {request_id} at slot {slot_idx}");
// FIXME: as a temporary hack, abort request if we don't get a response promptly.
// Lots of regression tests are getting stuck and failing at the moment,
@@ -232,7 +227,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
info!("request {request_id} timed out");
NeonIOResult::Error(libc::ETIMEDOUT)
});
trace!("request {request_id} at slot {request_idx} completed");
trace!("request {request_id} at slot {slot_idx} completed");
let owner_procno = slot.get_owner_procno();
// Ok, we have completed the IO. Mark the request as completed. After that,
@@ -246,6 +241,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
}
/// Compute the 'request_lsn' to use for a pageserver request
fn request_lsns(&self, not_modified_since_lsn: Lsn) -> page_api::ReadLsn {
let mut request_lsn = get_request_lsn();
@@ -287,6 +283,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
}
/// Handle one IO request
async fn handle_request(&'static self, req: &'_ NeonIORequest) -> NeonIOResult {
match req {
NeonIORequest::Empty => {
@@ -302,6 +299,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
.lock(RequestInProgressKey::Rel(rel), req.request_id)
.await;
// Check the cache first
let not_modified_since = match self.cache.get_rel_exists(&rel) {
CacheResult::Found(exists) => return NeonIOResult::RelExists(exists),
CacheResult::NotFound(lsn) => lsn,
@@ -413,7 +411,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
NeonIORequest::WritePage(req) => {
self.request_write_page_counter.inc();
// Also store it in the LFC while we still have it
let rel = req.reltag();
let _in_progress_guard = self
.in_progress_table
@@ -422,6 +419,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
req.request_id,
)
.await;
// We must at least update the last-written LSN on the page, but also store the page
// image in the LFC while we still have it
self.cache
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
.await;
@@ -438,6 +438,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
req.request_id,
)
.await;
// We must at least update the last-written LSN on the page and the relation size,
// but also store the page image in the LFC while we still have it
self.cache
.remember_page(&rel, req.block_number, req.src, Lsn(req.lsn), true)
.await;
@@ -481,6 +484,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
}
}
/// Subroutine to handle a GetPageV request, since it's a little more complicated than
/// others.
async fn handle_get_pagev_request(&'t self, req: &CGetPageVRequest) -> Result<(), i32> {
let rel = req.reltag();
@@ -488,10 +493,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
//
// Note: Because the backends perform a direct lookup in the cache before sending
// the request to the communicator process, we expect the pages to almost never
// be already in cache. It could happen when:
// be already in cache. It could happen if:
// 1. two backends try to read the same page at the same time, but that should never
// happen because there's higher level locking in the Postgres buffer manager, or
// 2. if a prefetch request finished at the same time as a backend requested the
// 2. a prefetch request finished at the same time as a backend requested the
// page. That's much more likely.
let mut cache_misses = Vec::with_capacity(req.nblocks as usize);
for i in 0..req.nblocks {
@@ -530,13 +535,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
.max()
.unwrap();
// Form a pageserver request for the cache misses
let read_lsn = self.request_lsns(not_modified_since);
// Construct a pageserver request for the cache misses
let block_numbers: Vec<u32> = cache_misses
.iter()
.map(|(blkno, _lsn, _dest, _guard)| *blkno)
.collect();
let read_lsn = self.request_lsns(not_modified_since);
info!(
"sending getpage request for blocks {:?} in rel {:?} lsns {}",
block_numbers, rel, read_lsn
@@ -592,6 +596,10 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
Ok(())
}
/// Subroutine to handle a PrefetchV request, since it's a little more complicated than
/// others.
///
/// This is very similar to a GetPageV request, but the results are only stored in the cache.
async fn handle_prefetchv_request(&'static self, req: &CPrefetchVRequest) -> Result<(), i32> {
let rel = req.reltag();