Rebase with main

This commit is contained in:
Konstantin Knizhnik
2025-04-10 09:47:32 +03:00
parent fcfaf0a3d0
commit fa4bd2b901
4 changed files with 8 additions and 25 deletions

View File

@@ -88,9 +88,6 @@ typedef PGAlignedBlock PGIOAlignedBlock;
page_server_api *page_server;
static uint32 local_request_counter;
#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter)
/*
* Various settings related to prompt (fast) handling of PageStream responses
* at any CHECK_FOR_INTERRUPTS point.
@@ -794,7 +791,7 @@ prefetch_read(PrefetchRequest *slot)
* Prefetch result should be placed in LFC by prefetch_wait_for.
*/
bool
prefetch_receive(BufferTag tag)
communicator_prefetch_receive(BufferTag tag)
{
PrfHashEntry *entry;
PrefetchRequest hashkey;
@@ -927,7 +924,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
NeonGetPageRequest request = {
.hdr.tag = T_NeonGetPageRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
/* lsn and not_modified_since are filled in below */
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
@@ -936,8 +932,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
slot->reqid = request.hdr.reqid;
if (force_request_lsns)
slot->request_lsns = *force_request_lsns;
else
@@ -955,6 +949,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(mySlotNo == MyPState->ring_unused);
/* loop */
}
slot->reqid = request.hdr.reqid;
/* update prefetch state */
MyPState->n_requests_inflight += 1;
@@ -1958,7 +1953,6 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
{
NeonExistsRequest request = {
.hdr.tag = T_NeonExistsRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
@@ -2233,7 +2227,6 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
{
NeonNblocksRequest request = {
.hdr.tag = T_NeonNblocksRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.rinfo = rinfo,
@@ -2306,7 +2299,6 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
{
NeonDbSizeRequest request = {
.hdr.tag = T_NeonDbSizeRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.dbNode = dbNode,
@@ -2374,7 +2366,6 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
request = (NeonGetSlruSegmentRequest) {
.hdr.tag = T_NeonGetSlruSegmentRequest,
.hdr.reqid = GENERATE_REQUEST_ID(),
.hdr.lsn = request_lsns->request_lsn,
.hdr.not_modified_since = request_lsns->not_modified_since,
.kind = kind,

View File

@@ -37,6 +37,8 @@ extern int communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum,
BlockNumber nblocks, void **buffers, bits8 *mask);
extern void communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask);
extern bool communicator_prefetch_receive(BufferTag tag);
extern int communicator_read_slru_segment(SlruKind kind, int64 segno,
neon_request_lsns *request_lsns,
void *buffer);

View File

@@ -46,6 +46,8 @@
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "pagestore_client.h"
#include "communicator.h"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
@@ -747,7 +749,7 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
{
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
(void)prefetch_register_bufferv(tag, NULL, 1, NULL, true);
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
n_sent += 1;
}
snd_idx += 1;
@@ -772,7 +774,7 @@ lfc_prewarm(FileCacheState* fcs, uint32 worker_id, uint32 n_workers)
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
if (prefetch_receive(tag))
if (communicator_prefetch_receive(tag))
{
ws->prewarmed_pages += 1;
}

View File

@@ -58,17 +58,6 @@ typedef struct
#define messageTag(m) (((const NeonMessage *)(m))->tag)
<<<<<<< HEAD
=======
#define NEON_TAG "[NEON_SMGR] "
#define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
#define neon_shard_log(shard_no, tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
>>>>>>> 7dbb65404 (Use standard prefetch mechanism for geting prewarm results fropm page server)
/* SLRUs downloadable from page server */
typedef enum {
SLRU_CLOG,
@@ -294,4 +283,3 @@ extern void forget_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum);
#endif /* PAGESTORE_CLIENT_H */
k