From e558e0da5c6a6dac048e5fd39825f5b88e09b749 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 4 Jul 2025 15:26:31 +0300 Subject: [PATCH] Assign request_id earlier, in the originating backend Makes it more useful for stitching together logs etc. for a specific request. --- pgxn/neon/communicator/src/neon_request.rs | 31 ++++++++++ .../src/worker_process/main_loop.rs | 8 +-- pgxn/neon/communicator_new.c | 62 +++++++++++++++++-- 3 files changed, 90 insertions(+), 11 deletions(-) diff --git a/pgxn/neon/communicator/src/neon_request.rs b/pgxn/neon/communicator/src/neon_request.rs index 95fab449f6..f54dcd9222 100644 --- a/pgxn/neon/communicator/src/neon_request.rs +++ b/pgxn/neon/communicator/src/neon_request.rs @@ -56,6 +56,26 @@ pub enum NeonIOResult { WriteOK, } +impl NeonIORequest { + pub fn request_id(&self) -> u64 { + use NeonIORequest::*; + match self { + Empty => 0, + RelExists(req) => req.request_id, + RelSize(req) => req.request_id, + GetPageV(req) => req.request_id, + PrefetchV(req) => req.request_id, + DbSize(req) => req.request_id, + WritePage(req) => req.request_id, + RelExtend(req) => req.request_id, + RelZeroExtend(req) => req.request_id, + RelCreate(req) => req.request_id, + RelTruncate(req) => req.request_id, + RelUnlink(req) => req.request_id, + } + } +} + #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CCachedGetPageVResult { @@ -118,6 +138,7 @@ impl ShmemBuf { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CRelExistsRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -127,6 +148,7 @@ pub struct CRelExistsRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CRelSizeRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -136,6 +158,7 @@ pub struct CRelSizeRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CGetPageVRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -150,6 +173,7 @@ pub struct CGetPageVRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CPrefetchVRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -161,6 +185,7 @@ pub struct CPrefetchVRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CDbSizeRequest { + pub request_id: u64, pub db_oid: COid, pub request_lsn: CLsn, } @@ -168,6 +193,7 @@ pub struct CDbSizeRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CWritePageRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -182,6 +208,7 @@ pub struct CWritePageRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CRelExtendRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -196,6 +223,7 @@ pub struct CRelExtendRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CRelZeroExtendRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -208,6 +236,7 @@ pub struct CRelZeroExtendRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CRelCreateRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -217,6 +246,7 @@ pub struct CRelCreateRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CRelTruncateRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, @@ -227,6 +257,7 @@ pub struct CRelTruncateRequest { #[repr(C)] #[derive(Copy, Clone, Debug)] pub struct CRelUnlinkRequest { + pub request_id: u64, pub spc_oid: COid, pub db_oid: COid, pub rel_number: u32, diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index a96ea04706..e876c6a57c 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -3,7 +3,6 @@ use std::os::fd::AsRawFd; use std::os::fd::OwnedFd; use std::path::PathBuf; use std::str::FromStr as _; -use std::sync::atomic::{AtomicU64, Ordering}; use crate::backend_comms::NeonIOHandle; use crate::file_cache::FileCache; @@ -39,8 +38,6 @@ pub struct CommunicatorWorkerProcessStruct<'a> { submission_pipe_read_fd: OwnedFd, - next_request_id: AtomicU64, - in_progress_table: RequestInProgressTable, // Metrics @@ -154,7 +151,6 @@ pub(super) async fn init( client, cache, submission_pipe_read_fd: cis.submission_pipe_read_fd, - next_request_id: AtomicU64::new(1), in_progress_table: RequestInProgressTable::new(), // metrics @@ -515,7 +511,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { match self .client .get_page(page_api::GetPageRequest { - request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), + request_id: req.request_id, request_class: page_api::GetPageClass::Normal, read_lsn, rel, @@ -601,7 +597,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { match self .client .get_page(page_api::GetPageRequest { - request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed), + request_id: req.request_id, request_class: page_api::GetPageClass::Prefetch, read_lsn: self.request_lsns(not_modified_since), rel, diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 86ac402c74..bd8166f721 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -26,6 +26,7 @@ #include "miscadmin.h" #include "postmaster/bgworker.h" #include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -73,6 +74,15 @@ typedef struct CommunicatorShmemPerBackendData */ Latch io_completion_latch; + /* + * Request counter, for assigning unique request IDs. + * + * This is only accessed by the backend itself, but we keep it in shared + * memory so that it survives across backend processes that are assigned + * the same proc number, to avoid reusing request IDs too fast. + */ + uint64 request_counter; + /* * Normally, when reading or writing pages from shared buffer cache, the * worker process can operate directly on the shared buffer. But when @@ -119,6 +129,36 @@ static void *bounce_write_if_needed(void *buffer); PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg); static void communicator_new_backend_exit(int code, Datum arg); +/* + * Request ID assignment. + * + * Request IDs better be unique across all this compute's in-flight requests, + * because they are used to match up responses to requests in the gRPC client + * code. Furthermore, for logging and debugging purposes, it's nice to avoid + * reusing them too fast, so that you can easily match up logs from different + * components based on the request id. + * + * The request IDs we generate consist of two parts: the backend's ProcNumber + * and a counter that can wrap-around. + */ +StaticAssertDecl(MAX_BACKENDS == 0x3FFFF, "Unexpected MAX_BACKENDS"); +#define PROCNUMBER_BITS UINT64CONST(18) +#define REQUEST_COUNTER_BITS UINT64CONST(46) +#define REQUEST_COUNTER_MASK ((UINT64CONST(1) << REQUEST_COUNTER_BITS) - 1) + +static inline uint64 +assign_request_id(void) +{ + uint64 counter; + uint64 result; + + counter = communicator_shmem_ptr->backends[MyProcNumber].request_counter++; + result = (((uint64) MyProcNumber) << PROCNUMBER_BITS) | (counter & REQUEST_COUNTER_MASK); + elog(LOG, "assigned request id " UINT64_FORMAT " (counter " UINT64_FORMAT ", procno %d)", result, counter, (int) MyProcNumber); + + return result; +} + /**** Initialization functions. These run in postmaster ****/ void @@ -206,7 +246,10 @@ communicator_new_shmem_startup(void) shmem_size -= communicator_size; for (int i = 0; i < MaxProcs; i++) + { InitSharedLatch(&communicator_shmem_ptr->backends[i].io_completion_latch); + communicator_shmem_ptr->backends[i].request_counter = 0; + } /* lfc_size_limit is in MBs */ initial_file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); @@ -436,6 +479,7 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu NeonIORequest request = { .tag = NeonIORequest_PrefetchV, .prefetch_v = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -452,7 +496,7 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu if (num_inflight_requests >= MAX_INFLIGHT_ASYNC_REQUESTS) process_inflight_requests(); - request_idx = bcomm_start_io_request(my_bs, &request, &result); + request_idx = start_request(&request, &result); if (request_idx == -1) { /* -1 means the request was satisfied immediately. */ @@ -461,8 +505,6 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu } inflight_requests[num_inflight_requests] = request_idx; num_inflight_requests++; - - elog(LOG, "sent prefetch request with idx %d", request_idx); } /* @@ -505,7 +547,7 @@ process_inflight_requests(void) * request is submitted. */ static void -perform_request(NeonIORequest * request, struct NeonIOResult *result_p) +perform_request(NeonIORequest *request, struct NeonIOResult *result_p) { int request_idx; @@ -521,7 +563,7 @@ perform_request(NeonIORequest * request, struct NeonIOResult *result_p) } static int -start_request(NeonIORequest * request, struct NeonIOResult *immediate_result_p) +start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p) { int request_idx; @@ -600,6 +642,7 @@ communicator_new_rel_exists(NRelFileInfo rinfo, ForkNumber forkNum) NeonIORequest request = { .tag = NeonIORequest_RelExists, .rel_exists = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -639,6 +682,7 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe NeonIORequest request = { .tag = NeonIORequest_GetPageV, .get_page_v = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -764,6 +808,7 @@ communicator_new_rel_nblocks(NRelFileInfo rinfo, ForkNumber forkNum) NeonIORequest request = { .tag = NeonIORequest_RelSize, .rel_size = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -798,6 +843,7 @@ communicator_new_dbsize(Oid dbNode) NeonIORequest request = { .tag = NeonIORequest_DbSize, .db_size = { + .request_id = assign_request_id(), .db_oid = dbNode, } }; @@ -836,6 +882,7 @@ communicator_new_write_page(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber NeonIORequest request = { .tag = NeonIORequest_WritePage, .write_page = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -875,6 +922,7 @@ communicator_new_rel_extend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber NeonIORequest request = { .tag = NeonIORequest_RelExtend, .rel_extend = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -913,6 +961,7 @@ communicator_new_rel_zeroextend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNum NeonIORequest request = { .tag = NeonIORequest_RelZeroExtend, .rel_zero_extend = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -950,6 +999,7 @@ communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum) NeonIORequest request = { .tag = NeonIORequest_RelCreate, .rel_create = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -981,6 +1031,7 @@ communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe NeonIORequest request = { .tag = NeonIORequest_RelTruncate, .rel_truncate = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo), @@ -1013,6 +1064,7 @@ communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum) NeonIORequest request = { .tag = NeonIORequest_RelUnlink, .rel_unlink = { + .request_id = assign_request_id(), .spc_oid = NInfoGetSpcOid(rinfo), .db_oid = NInfoGetDbOid(rinfo), .rel_number = NInfoGetRelNumber(rinfo),