Assign request_id earlier, in the originating backend

Makes it more useful for stitching together logs etc. for a specific
request.
This commit is contained in:
Heikki Linnakangas
2025-07-04 15:26:31 +03:00
parent 70bf2e088d
commit e558e0da5c
3 changed files with 90 additions and 11 deletions

View File

@@ -56,6 +56,26 @@ pub enum NeonIOResult {
WriteOK,
}
impl NeonIORequest {
pub fn request_id(&self) -> u64 {
use NeonIORequest::*;
match self {
Empty => 0,
RelExists(req) => req.request_id,
RelSize(req) => req.request_id,
GetPageV(req) => req.request_id,
PrefetchV(req) => req.request_id,
DbSize(req) => req.request_id,
WritePage(req) => req.request_id,
RelExtend(req) => req.request_id,
RelZeroExtend(req) => req.request_id,
RelCreate(req) => req.request_id,
RelTruncate(req) => req.request_id,
RelUnlink(req) => req.request_id,
}
}
}
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CCachedGetPageVResult {
@@ -118,6 +138,7 @@ impl ShmemBuf {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CRelExistsRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -127,6 +148,7 @@ pub struct CRelExistsRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CRelSizeRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -136,6 +158,7 @@ pub struct CRelSizeRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CGetPageVRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -150,6 +173,7 @@ pub struct CGetPageVRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CPrefetchVRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -161,6 +185,7 @@ pub struct CPrefetchVRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CDbSizeRequest {
pub request_id: u64,
pub db_oid: COid,
pub request_lsn: CLsn,
}
@@ -168,6 +193,7 @@ pub struct CDbSizeRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CWritePageRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -182,6 +208,7 @@ pub struct CWritePageRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CRelExtendRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -196,6 +223,7 @@ pub struct CRelExtendRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CRelZeroExtendRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -208,6 +236,7 @@ pub struct CRelZeroExtendRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CRelCreateRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -217,6 +246,7 @@ pub struct CRelCreateRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CRelTruncateRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,
@@ -227,6 +257,7 @@ pub struct CRelTruncateRequest {
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct CRelUnlinkRequest {
pub request_id: u64,
pub spc_oid: COid,
pub db_oid: COid,
pub rel_number: u32,

View File

@@ -3,7 +3,6 @@ use std::os::fd::AsRawFd;
use std::os::fd::OwnedFd;
use std::path::PathBuf;
use std::str::FromStr as _;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::backend_comms::NeonIOHandle;
use crate::file_cache::FileCache;
@@ -39,8 +38,6 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
submission_pipe_read_fd: OwnedFd,
next_request_id: AtomicU64,
in_progress_table: RequestInProgressTable,
// Metrics
@@ -154,7 +151,6 @@ pub(super) async fn init(
client,
cache,
submission_pipe_read_fd: cis.submission_pipe_read_fd,
next_request_id: AtomicU64::new(1),
in_progress_table: RequestInProgressTable::new(),
// metrics
@@ -515,7 +511,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
match self
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_id: req.request_id,
request_class: page_api::GetPageClass::Normal,
read_lsn,
rel,
@@ -601,7 +597,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
match self
.client
.get_page(page_api::GetPageRequest {
request_id: self.next_request_id.fetch_add(1, Ordering::Relaxed),
request_id: req.request_id,
request_class: page_api::GetPageClass::Prefetch,
read_lsn: self.request_lsns(not_modified_since),
rel,

View File

@@ -26,6 +26,7 @@
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -73,6 +74,15 @@ typedef struct CommunicatorShmemPerBackendData
*/
Latch io_completion_latch;
/*
* Request counter, for assigning unique request IDs.
*
* This is only accessed by the backend itself, but we keep it in shared
* memory so that it survives across backend processes that are assigned
* the same proc number, to avoid reusing request IDs too fast.
*/
uint64 request_counter;
/*
* Normally, when reading or writing pages from shared buffer cache, the
* worker process can operate directly on the shared buffer. But when
@@ -119,6 +129,36 @@ static void *bounce_write_if_needed(void *buffer);
PGDLLEXPORT void communicator_new_bgworker_main(Datum main_arg);
static void communicator_new_backend_exit(int code, Datum arg);
/*
* Request ID assignment.
*
* Request IDs better be unique across all this compute's in-flight requests,
* because they are used to match up responses to requests in the gRPC client
* code. Furthermore, for logging and debugging purposes, it's nice to avoid
* reusing them too fast, so that you can easily match up logs from different
* components based on the request id.
*
* The request IDs we generate consist of two parts: the backend's ProcNumber
* and a counter that can wrap-around.
*/
StaticAssertDecl(MAX_BACKENDS == 0x3FFFF, "Unexpected MAX_BACKENDS");
#define PROCNUMBER_BITS UINT64CONST(18)
#define REQUEST_COUNTER_BITS UINT64CONST(46)
#define REQUEST_COUNTER_MASK ((UINT64CONST(1) << REQUEST_COUNTER_BITS) - 1)
static inline uint64
assign_request_id(void)
{
uint64 counter;
uint64 result;
counter = communicator_shmem_ptr->backends[MyProcNumber].request_counter++;
result = (((uint64) MyProcNumber) << PROCNUMBER_BITS) | (counter & REQUEST_COUNTER_MASK);
elog(LOG, "assigned request id " UINT64_FORMAT " (counter " UINT64_FORMAT ", procno %d)", result, counter, (int) MyProcNumber);
return result;
}
/**** Initialization functions. These run in postmaster ****/
void
@@ -206,7 +246,10 @@ communicator_new_shmem_startup(void)
shmem_size -= communicator_size;
for (int i = 0; i < MaxProcs; i++)
{
InitSharedLatch(&communicator_shmem_ptr->backends[i].io_completion_latch);
communicator_shmem_ptr->backends[i].request_counter = 0;
}
/* lfc_size_limit is in MBs */
initial_file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
@@ -436,6 +479,7 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu
NeonIORequest request = {
.tag = NeonIORequest_PrefetchV,
.prefetch_v = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -452,7 +496,7 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu
if (num_inflight_requests >= MAX_INFLIGHT_ASYNC_REQUESTS)
process_inflight_requests();
request_idx = bcomm_start_io_request(my_bs, &request, &result);
request_idx = start_request(&request, &result);
if (request_idx == -1)
{
/* -1 means the request was satisfied immediately. */
@@ -461,8 +505,6 @@ communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNu
}
inflight_requests[num_inflight_requests] = request_idx;
num_inflight_requests++;
elog(LOG, "sent prefetch request with idx %d", request_idx);
}
/*
@@ -505,7 +547,7 @@ process_inflight_requests(void)
* request is submitted.
*/
static void
perform_request(NeonIORequest * request, struct NeonIOResult *result_p)
perform_request(NeonIORequest *request, struct NeonIOResult *result_p)
{
int request_idx;
@@ -521,7 +563,7 @@ perform_request(NeonIORequest * request, struct NeonIOResult *result_p)
}
static int
start_request(NeonIORequest * request, struct NeonIOResult *immediate_result_p)
start_request(NeonIORequest *request, struct NeonIOResult *immediate_result_p)
{
int request_idx;
@@ -600,6 +642,7 @@ communicator_new_rel_exists(NRelFileInfo rinfo, ForkNumber forkNum)
NeonIORequest request = {
.tag = NeonIORequest_RelExists,
.rel_exists = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -639,6 +682,7 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe
NeonIORequest request = {
.tag = NeonIORequest_GetPageV,
.get_page_v = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -764,6 +808,7 @@ communicator_new_rel_nblocks(NRelFileInfo rinfo, ForkNumber forkNum)
NeonIORequest request = {
.tag = NeonIORequest_RelSize,
.rel_size = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -798,6 +843,7 @@ communicator_new_dbsize(Oid dbNode)
NeonIORequest request = {
.tag = NeonIORequest_DbSize,
.db_size = {
.request_id = assign_request_id(),
.db_oid = dbNode,
}
};
@@ -836,6 +882,7 @@ communicator_new_write_page(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber
NeonIORequest request = {
.tag = NeonIORequest_WritePage,
.write_page = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -875,6 +922,7 @@ communicator_new_rel_extend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber
NeonIORequest request = {
.tag = NeonIORequest_RelExtend,
.rel_extend = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -913,6 +961,7 @@ communicator_new_rel_zeroextend(NRelFileInfo rinfo, ForkNumber forkNum, BlockNum
NeonIORequest request = {
.tag = NeonIORequest_RelZeroExtend,
.rel_zero_extend = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -950,6 +999,7 @@ communicator_new_rel_create(NRelFileInfo rinfo, ForkNumber forkNum)
NeonIORequest request = {
.tag = NeonIORequest_RelCreate,
.rel_create = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -981,6 +1031,7 @@ communicator_new_rel_truncate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe
NeonIORequest request = {
.tag = NeonIORequest_RelTruncate,
.rel_truncate = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),
@@ -1013,6 +1064,7 @@ communicator_new_rel_unlink(NRelFileInfo rinfo, ForkNumber forkNum)
NeonIORequest request = {
.tag = NeonIORequest_RelUnlink,
.rel_unlink = {
.request_id = assign_request_id(),
.spc_oid = NInfoGetSpcOid(rinfo),
.db_oid = NInfoGetDbOid(rinfo),
.rel_number = NInfoGetRelNumber(rinfo),