From f30c59bec9772d84f93184eeeebd40a79efc7175 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 10 Jul 2025 15:02:00 +0300 Subject: [PATCH] Improve comments on request slots --- pgxn/neon/communicator/src/backend_comms.rs | 84 +++++++++++-------- .../communicator/src/backend_interface.rs | 2 +- 2 files changed, 51 insertions(+), 35 deletions(-) diff --git a/pgxn/neon/communicator/src/backend_comms.rs b/pgxn/neon/communicator/src/backend_comms.rs index 5851bd0d7a..d2f65a8fd4 100644 --- a/pgxn/neon/communicator/src/backend_comms.rs +++ b/pgxn/neon/communicator/src/backend_comms.rs @@ -1,9 +1,10 @@ -//! This module implements a request/response "slot" for submitting requests from backends -//! to the communicator process. +//! This module implements a request/response "slot" for submitting +//! requests from backends to the communicator process. //! //! NB: The "backend" side of this code runs in Postgres backend processes, //! which means that it is not safe to use the 'tracing' crate for logging, nor -//! to launch threads or use tokio tasks. +//! to launch threads or use tokio tasks! + use std::cell::UnsafeCell; use std::sync::atomic::fence; use std::sync::atomic::{AtomicI32, Ordering}; @@ -12,7 +13,8 @@ use crate::neon_request::{NeonIORequest, NeonIOResult}; use atomic_enum::atomic_enum; -/// One request/response slot. Each backend has its own set of slots that it uses. +/// One request/response slot. Each backend has its own set of slots that it +/// uses. /// /// This is the moral equivalent of PgAioHandle for Postgres AIO requests /// Like PgAioHandle, try to keep this small. @@ -21,7 +23,7 @@ use atomic_enum::atomic_enum; /// /// ## Lifecycle of a request /// -/// The slot is always owned by either the backend process or the communicator +/// A slot is always owned by either the backend process or the communicator /// process, depending on the 'state'. Only the owning process is allowed to /// read or modify the slot, except for reading the 'state' itself to check who /// owns it. @@ -39,39 +41,41 @@ use atomic_enum::atomic_enum; /// slot for a new request. /// /// For correctness of the above protocol, we really only need two states: -/// "owned by backend" and "owned by communicator process. But to help with -/// debugging, there are a few more states. When the backend starts to fill in -/// the request details in the slot, it first sets the state from Idle to -/// Filling, and when it's done with that, from Filling to Submitted. In the -/// Filling state, the slot is still owned by the backend. Similarly, when the -/// communicator process starts to process a request, it sets it to Processing -/// state first, but the slot is still owned by the communicator process. +/// "owned by backend" and "owned by communicator process". But to help with +/// debugging and better assertions, there are a few more states. When the +/// backend starts to fill in the request details in the slot, it first sets the +/// state from Idle to Filling, and when it's done with that, from Filling to +/// Submitted. In the Filling state, the slot is still owned by the +/// backend. Similarly, when the communicator process starts to process a +/// request, it sets it to Processing state first, but the slot is still owned +/// by the communicator process. /// /// This struct doesn't handle waking up the communicator process when a request -/// has been submitted or when a response is ready. We only store the 'owner_procno' -/// which can be used for waking up the backend on completion, but the wakeups are -/// performed elsewhere. +/// has been submitted or when a response is ready. The 'owner_procno' is used +/// for waking up the backend on completion, but that happens elsewhere. pub struct NeonIORequestSlot { /// similar to PgAioHandleState state: AtomicNeonIORequestSlotState, - /// The owning process's ProcNumber. The worker process uses this to set the process's - /// latch on completion. + /// The owning process's ProcNumber. The worker process uses this to set the + /// process's latch on completion. /// - /// (This could be calculated from num_neon_request_slots_per_backend and the index of - /// this slot in the overall 'neon_requst_slots array') + /// (This could be calculated from num_neon_request_slots_per_backend and + /// the index of this slot in the overall 'neon_requst_slots array'. But we + /// prefer the communicator process to not know how the request slots are + /// divided between the backends.) owner_procno: AtomicI32, - /// SAFETY: This is modified by fill_request(), after it has established ownership - /// of the slot by setting state from Idle to Filling + /// SAFETY: This is modified by submit_request(), after it has established + /// ownership of the slot by setting state from Idle to Filling request: UnsafeCell, - /// valid when state is Completed + /// Valid when state is Completed /// - /// SAFETY: This is modified by RequestProcessingGuard::complete(). There can be - /// only one RequestProcessingGuard outstanding for a slot at a time, because - /// it is returned by start_processing_request() which checks the state, so - /// RequestProcessingGuard has exclusive access to the slot. + /// SAFETY: This is modified by RequestProcessingGuard::complete(). There + /// can be only one RequestProcessingGuard outstanding for a slot at a time, + /// because it is returned by start_processing_request() which checks the + /// state, so RequestProcessingGuard has exclusive access to the slot. result: UnsafeCell, } @@ -96,7 +100,7 @@ impl Default for NeonIORequestSlot { pub enum NeonIORequestSlotState { Idle, - /// backend is filling in the request + /// Backend is filling in the request Filling, /// Backend has submitted the request to the communicator, but the @@ -112,12 +116,17 @@ pub enum NeonIORequestSlotState { } impl NeonIORequestSlot { - pub fn fill_request(&self, request: &NeonIORequest, proc_number: i32) { - // Verify that the slot is in Idle state previously, and start filling it. + /// Write a request to the slot, and mark it as Submitted. + /// + /// Note: This does not wake up the worker process to actually process + /// the request. It's the caller's responsibility to do that. + pub fn submit_request(&self, request: &NeonIORequest, proc_number: i32) { + // Verify that the slot is in Idle state previously, and put it in + // Filling state. // - // XXX: This step isn't strictly necessary. Assuming the caller didn't screw up - // and try to use a slot that's already in use, we could fill the slot and - // switch it directly from Idle to Submitted state. + // XXX: This step isn't strictly necessary. Assuming the caller didn't + // screw up and try to use a slot that's already in use, we could fill + // the slot and switch it directly from Idle to Submitted state. if let Err(s) = self.state.compare_exchange( NeonIORequestSlotState::Idle, NeonIORequestSlotState::Filling, @@ -177,6 +186,12 @@ impl NeonIORequestSlot { } } +/// [`NeonIORequestSlot::start_processing_request`] returns this guard object to +/// indicate that the the caller now "owns" the slot, until it calls +/// [`RequestProcessingGuard::completed`]. +/// +/// TODO: implement Drop on this, to mark the request as Aborted or Errored +/// if [`RequestProcessingGuard::completed`] is not called. pub struct RequestProcessingGuard<'a>(&'a NeonIORequestSlot); unsafe impl<'a> Send for RequestProcessingGuard<'a> {} @@ -192,12 +207,13 @@ impl<'a> RequestProcessingGuard<'a> { } pub fn completed(self, result: NeonIOResult) { + // Store the result to the slot. unsafe { *self.0.result.get() = result; }; - // Ok, we have completed the IO. Mark the request as completed. After that, - // we no longer have ownership of the slot, and must not modify it. + // Mark the request as completed. After that, we no longer have + // ownership of the slot, and must not modify it. let old_state = self .0 .state diff --git a/pgxn/neon/communicator/src/backend_interface.rs b/pgxn/neon/communicator/src/backend_interface.rs index 91ecf3f4c4..a45583d5e3 100644 --- a/pgxn/neon/communicator/src/backend_interface.rs +++ b/pgxn/neon/communicator/src/backend_interface.rs @@ -232,6 +232,6 @@ impl<'t> CommunicatorBackendStruct<'t> { pub(crate) fn start_neon_io_request(&mut self, request_slot_idx: i32, request: &NeonIORequest) { let my_proc_number = self.my_proc_number; - self.neon_request_slots[request_slot_idx as usize].fill_request(request, my_proc_number); + self.neon_request_slots[request_slot_idx as usize].submit_request(request, my_proc_number); } }