diff --git a/pgxn/neon/communicator.c b/pgxn/neon/communicator.c index 142590c66d..61bb3206e7 100644 --- a/pgxn/neon/communicator.c +++ b/pgxn/neon/communicator.c @@ -88,9 +88,6 @@ typedef PGAlignedBlock PGIOAlignedBlock; page_server_api *page_server; -static uint32 local_request_counter; -#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter) - /* * Various settings related to prompt (fast) handling of PageStream responses * at any CHECK_FOR_INTERRUPTS point. @@ -794,7 +791,7 @@ prefetch_read(PrefetchRequest *slot) * Prefetch result should be placed in LFC by prefetch_wait_for. */ bool -prefetch_receive(BufferTag tag) +communicator_prefetch_receive(BufferTag tag) { PrfHashEntry *entry; PrefetchRequest hashkey; @@ -927,7 +924,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns NeonGetPageRequest request = { .hdr.tag = T_NeonGetPageRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), /* lsn and not_modified_since are filled in below */ .rinfo = BufTagGetNRelFileInfo(slot->buftag), .forknum = slot->buftag.forkNum, @@ -936,8 +932,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(mySlotNo == MyPState->ring_unused); - slot->reqid = request.hdr.reqid; - if (force_request_lsns) slot->request_lsns = *force_request_lsns; else @@ -955,6 +949,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(mySlotNo == MyPState->ring_unused); /* loop */ } + slot->reqid = request.hdr.reqid; /* update prefetch state */ MyPState->n_requests_inflight += 1; @@ -1958,7 +1953,6 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r { NeonExistsRequest request = { .hdr.tag = T_NeonExistsRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), .hdr.lsn = request_lsns->request_lsn, .hdr.not_modified_since = request_lsns->not_modified_since, .rinfo = rinfo, @@ -2233,7 +2227,6 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns * { NeonNblocksRequest request = { .hdr.tag = T_NeonNblocksRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), .hdr.lsn = request_lsns->request_lsn, .hdr.not_modified_since = request_lsns->not_modified_since, .rinfo = rinfo, @@ -2306,7 +2299,6 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns) { NeonDbSizeRequest request = { .hdr.tag = T_NeonDbSizeRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), .hdr.lsn = request_lsns->request_lsn, .hdr.not_modified_since = request_lsns->not_modified_since, .dbNode = dbNode, @@ -2374,7 +2366,6 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re request = (NeonGetSlruSegmentRequest) { .hdr.tag = T_NeonGetSlruSegmentRequest, - .hdr.reqid = GENERATE_REQUEST_ID(), .hdr.lsn = request_lsns->request_lsn, .hdr.not_modified_since = request_lsns->not_modified_since, .kind = kind, diff --git a/pgxn/neon/communicator.h b/pgxn/neon/communicator.h index 72cba526c1..f55c4b10f1 100644 --- a/pgxn/neon/communicator.h +++ b/pgxn/neon/communicator.h @@ -37,6 +37,8 @@ extern int communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber nblocks, void **buffers, bits8 *mask); extern void communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns, BlockNumber nblocks, const bits8 *mask); +extern bool communicator_prefetch_receive(BufferTag tag); + extern int communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *request_lsns, void *buffer); diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index f8785adc56..838afdc804 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -46,6 +46,8 @@ #include "neon.h" #include "neon_lwlsncache.h" #include "neon_perf_counters.h" +#include "pagestore_client.h" +#include "communicator.h" #define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) @@ -747,7 +749,7 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) { tag = fcs->chunks[snd_idx >> fcs_chunk_size_log]; tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1); - (void)prefetch_register_bufferv(tag, NULL, 1, NULL, true); + (void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL); n_sent += 1; } snd_idx += 1; @@ -772,7 +774,7 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers) tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log]; tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1); - if (prefetch_receive(tag)) + if (communicator_prefetch_receive(tag)) { ws->prewarmed_pages += 1; } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index e5efd01a6d..b1dc0ec63a 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -58,17 +58,6 @@ typedef struct #define messageTag(m) (((const NeonMessage *)(m))->tag) -<<<<<<< HEAD -======= -#define NEON_TAG "[NEON_SMGR] " -#define neon_log(tag, fmt, ...) ereport(tag, \ - (errmsg(NEON_TAG fmt, ##__VA_ARGS__), \ - errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) -#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \ - (errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \ - errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0))) - ->>>>>>> 7dbb65404 (Use standard prefetch mechanism for geting prewarm results fropm page server) /* SLRUs downloadable from page server */ typedef enum { SLRU_CLOG, @@ -294,4 +283,3 @@ extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum); #endif /* PAGESTORE_CLIENT_H */ -k