mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Refactor the request LSNs to a separate struct (#7708)
We had a lot of code that passed around the two LSNs that are associated with each GetPage request. Introduce a new struct to encapsulate them. I'm about to add a third LSN to the struct in the next commit, this is a mechanical refactoring in preparation for that.
This commit is contained in:
committed by
Heikki Linnakangas
parent
3a6fa76828
commit
ba20752b76
@@ -237,18 +237,38 @@ extern void neon_zeroextend(SMgrRelation reln, ForkNumber forknum,
|
||||
extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum);
|
||||
|
||||
/*
|
||||
* LSN values associated with each request to the pageserver
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
/*
|
||||
* 'request_lsn' is the main value that determines which page version to
|
||||
* fetch.
|
||||
*/
|
||||
XLogRecPtr request_lsn;
|
||||
|
||||
/*
|
||||
* A hint to the pageserver that the requested page hasn't been modified
|
||||
* between this LSN and 'request_lsn'. That allows the pageserver to
|
||||
* return the page faster, without waiting for 'request_lsn' to arrive in
|
||||
* the pageserver, as long as 'not_modified_since' has arrived.
|
||||
*/
|
||||
XLogRecPtr not_modified_since;
|
||||
} neon_request_lsns;
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
char *buffer);
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
|
||||
neon_request_lsns request_lsns, char *buffer);
|
||||
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||
#else
|
||||
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
void *buffer);
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
|
||||
neon_request_lsns request_lsns, void *buffer);
|
||||
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, const void *buffer, bool skipFsync);
|
||||
#endif
|
||||
|
||||
@@ -168,8 +168,7 @@ typedef enum PrefetchStatus
|
||||
typedef struct PrefetchRequest
|
||||
{
|
||||
BufferTag buftag; /* must be first entry in the struct */
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
neon_request_lsns request_lsns;
|
||||
NeonResponse *response; /* may be null */
|
||||
PrefetchStatus status;
|
||||
shardno_t shard_no;
|
||||
@@ -271,16 +270,15 @@ static PrefetchState *MyPState;
|
||||
|
||||
static bool compact_prefetch_buffers(void);
|
||||
static void consume_prefetch_responses(void);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, neon_request_lsns *force_request_lsns);
|
||||
static bool prefetch_read(PrefetchRequest *slot);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns);
|
||||
static bool prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup_trailing_unused(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index);
|
||||
|
||||
static void neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since);
|
||||
static bool neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
|
||||
static neon_request_lsns neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno);
|
||||
static bool neon_prefetch_response_usable(neon_request_lsns request_lsns,
|
||||
PrefetchRequest *slot);
|
||||
|
||||
static bool
|
||||
@@ -338,8 +336,7 @@ compact_prefetch_buffers(void)
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->request_lsn = source_slot->request_lsn;
|
||||
target_slot->not_modified_since = source_slot->not_modified_since;
|
||||
target_slot->request_lsns = source_slot->request_lsns;
|
||||
target_slot->my_ring_index = empty_ring_index;
|
||||
|
||||
prfh_delete(MyPState->prf_hash, source_slot);
|
||||
@@ -358,8 +355,9 @@ compact_prefetch_buffers(void)
|
||||
};
|
||||
source_slot->response = NULL;
|
||||
source_slot->my_ring_index = 0;
|
||||
source_slot->request_lsn = InvalidXLogRecPtr;
|
||||
source_slot->not_modified_since = InvalidXLogRecPtr;
|
||||
source_slot->request_lsns = (neon_request_lsns) {
|
||||
InvalidXLogRecPtr, InvalidXLogRecPtr
|
||||
};
|
||||
|
||||
/* update bookkeeping */
|
||||
n_moved++;
|
||||
@@ -689,7 +687,7 @@ prefetch_set_unused(uint64 ring_index)
|
||||
* prefetch_wait_for().
|
||||
*/
|
||||
static void
|
||||
prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since)
|
||||
prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns)
|
||||
{
|
||||
bool found;
|
||||
NeonGetPageRequest request = {
|
||||
@@ -700,23 +698,14 @@ prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRe
|
||||
.blkno = slot->buftag.blockNum,
|
||||
};
|
||||
|
||||
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
|
||||
|
||||
if (force_request_lsn)
|
||||
{
|
||||
request.req.lsn = *force_request_lsn;
|
||||
request.req.not_modified_since = *force_not_modified_since;
|
||||
}
|
||||
if (force_request_lsns)
|
||||
slot->request_lsns = *force_request_lsns;
|
||||
else
|
||||
{
|
||||
neon_get_request_lsn(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum,
|
||||
&request.req.lsn,
|
||||
&request.req.not_modified_since);
|
||||
}
|
||||
slot->request_lsn = request.req.lsn;
|
||||
slot->not_modified_since = request.req.not_modified_since;
|
||||
slot->request_lsns = neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum);
|
||||
request.req.lsn = slot->request_lsns.request_lsn;
|
||||
request.req.not_modified_since = slot->request_lsns.not_modified_since;
|
||||
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
@@ -742,25 +731,22 @@ prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRe
|
||||
*
|
||||
* Register that we may want the contents of BufferTag in the near future.
|
||||
*
|
||||
* If force_request_lsn and force_not_modified_since are not NULL, those
|
||||
* values are sent to the pageserver. If they are NULL, we utilize the
|
||||
* lastWrittenLsn -infrastructure to fill them in.
|
||||
* If force_request_lsns is not NULL, those values are sent to the
|
||||
* pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure
|
||||
* to calculate the LSNs to send.
|
||||
*
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
*/
|
||||
|
||||
static uint64
|
||||
prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn,
|
||||
XLogRecPtr *force_not_modified_since)
|
||||
prefetch_register_buffer(BufferTag tag, neon_request_lsns *force_request_lsns)
|
||||
{
|
||||
uint64 ring_index;
|
||||
PrefetchRequest req;
|
||||
PrefetchRequest *slot;
|
||||
PrfHashEntry *entry;
|
||||
|
||||
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
Retry:
|
||||
@@ -781,10 +767,9 @@ Retry:
|
||||
* If the caller specified a request LSN to use, only accept prefetch
|
||||
* responses that satisfy that request.
|
||||
*/
|
||||
if (force_request_lsn)
|
||||
if (force_request_lsns)
|
||||
{
|
||||
if (!neon_prefetch_response_usable(*force_request_lsn,
|
||||
*force_not_modified_since, slot))
|
||||
if (!neon_prefetch_response_usable(*force_request_lsns, slot))
|
||||
{
|
||||
/* Wait for the old request to finish and discard it */
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
@@ -886,7 +871,7 @@ Retry:
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
|
||||
prefetch_do_request(slot, force_request_lsn, force_not_modified_since);
|
||||
prefetch_do_request(slot, force_request_lsns);
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
@@ -1529,11 +1514,11 @@ nm_adjust_lsn(XLogRecPtr lsn)
|
||||
/*
|
||||
* Return LSN for requesting pages and number of blocks from page server
|
||||
*/
|
||||
static void
|
||||
neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since)
|
||||
static neon_request_lsns
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
|
||||
{
|
||||
XLogRecPtr last_written_lsn;
|
||||
neon_request_lsns result;
|
||||
|
||||
last_written_lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
last_written_lsn = nm_adjust_lsn(last_written_lsn);
|
||||
@@ -1542,12 +1527,12 @@ neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/* Request the page at the last replayed LSN. */
|
||||
*request_lsn = GetXLogReplayRecPtr(NULL);
|
||||
*not_modified_since = last_written_lsn;
|
||||
Assert(last_written_lsn <= *request_lsn);
|
||||
result.request_lsn = GetXLogReplayRecPtr(NULL);
|
||||
result.not_modified_since = last_written_lsn;
|
||||
Assert(last_written_lsn <= result.request_lsn);
|
||||
|
||||
neon_log(DEBUG1, "neon_get_request_lsn request lsn %X/%X, not_modified_since %X/%X",
|
||||
LSN_FORMAT_ARGS(*request_lsn), LSN_FORMAT_ARGS(*not_modified_since));
|
||||
neon_log(DEBUG1, "neon_get_request_lsns request lsn %X/%X, not_modified_since %X/%X",
|
||||
LSN_FORMAT_ARGS(result.request_lsn), LSN_FORMAT_ARGS(result.not_modified_since));
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1559,7 +1544,7 @@ neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
* must still in the buffer cache, so our request cannot concern
|
||||
* those.
|
||||
*/
|
||||
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
|
||||
neon_log(DEBUG1, "neon_get_request_lsns GetLastWrittenLSN lsn %X/%X",
|
||||
LSN_FORMAT_ARGS(last_written_lsn));
|
||||
|
||||
/*
|
||||
@@ -1592,9 +1577,11 @@ neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
* flush, it should still be in the buffer cache, and we wouldn't be
|
||||
* requesting it.
|
||||
*/
|
||||
*request_lsn = flushlsn;
|
||||
*not_modified_since = last_written_lsn;
|
||||
result.request_lsn = flushlsn;
|
||||
result.not_modified_since = last_written_lsn;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1604,12 +1591,12 @@ neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
* satisfy a page read now.
|
||||
*/
|
||||
static bool
|
||||
neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
|
||||
neon_prefetch_response_usable(neon_request_lsns request_lsns,
|
||||
PrefetchRequest *slot)
|
||||
{
|
||||
/* sanity check the LSN's on the old and the new request */
|
||||
Assert(request_lsn >= not_modified_since);
|
||||
Assert(slot->request_lsn >= slot->not_modified_since);
|
||||
Assert(request_lsns.request_lsn >= request_lsns.not_modified_since);
|
||||
Assert(slot->request_lsns.request_lsn >= slot->request_lsns.not_modified_since);
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
|
||||
/*
|
||||
@@ -1627,14 +1614,15 @@ neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_si
|
||||
* calculate LSNs "out of order" with each other, but the prefetch queue
|
||||
* is backend-private at the moment.)
|
||||
*/
|
||||
if (request_lsn < slot->request_lsn || not_modified_since < slot->not_modified_since)
|
||||
if (request_lsns.request_lsn < slot->request_lsns.request_lsn ||
|
||||
request_lsns.not_modified_since < slot->request_lsns.not_modified_since)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg(NEON_TAG "request with unexpected LSN after prefetch"),
|
||||
errdetail("Request %X/%X not_modified_since %X/%X, prefetch %X/%X not_modified_since %X/%X)",
|
||||
LSN_FORMAT_ARGS(request_lsn), LSN_FORMAT_ARGS(not_modified_since),
|
||||
LSN_FORMAT_ARGS(slot->request_lsn), LSN_FORMAT_ARGS(slot->not_modified_since))));
|
||||
LSN_FORMAT_ARGS(request_lsns.request_lsn), LSN_FORMAT_ARGS(request_lsns.not_modified_since),
|
||||
LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since))));
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -1675,9 +1663,9 @@ neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_si
|
||||
*/
|
||||
|
||||
/* this follows from the checks above */
|
||||
Assert(request_lsn >= slot->not_modified_since);
|
||||
Assert(request_lsns.request_lsn >= slot->request_lsns.not_modified_since);
|
||||
|
||||
return not_modified_since <= slot->request_lsn;
|
||||
return request_lsns.not_modified_since <= slot->request_lsns.request_lsn;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1689,8 +1677,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
bool exists;
|
||||
NeonResponse *resp;
|
||||
BlockNumber n_blocks;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
neon_request_lsns request_lsns;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -1745,15 +1732,15 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
return false;
|
||||
}
|
||||
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
request_lsns = neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
{
|
||||
NeonExistsRequest request = {
|
||||
.req.tag = T_NeonExistsRequest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.req.lsn = request_lsns.request_lsn,
|
||||
.req.not_modified_since = request_lsns.not_modified_since,
|
||||
.rinfo = InfoFromSMgrRel(reln),
|
||||
.forknum = forkNum};
|
||||
.forknum = forkNum
|
||||
};
|
||||
|
||||
resp = page_server_request(&request);
|
||||
}
|
||||
@@ -1770,7 +1757,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forkNum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
LSN_FORMAT_ARGS(request_lsns.request_lsn)),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
@@ -2135,7 +2122,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, InfoFromSMgrRel(reln));
|
||||
|
||||
ring_index = prefetch_register_buffer(tag, NULL, NULL);
|
||||
ring_index = prefetch_register_buffer(tag, NULL);
|
||||
|
||||
Assert(ring_index < MyPState->ring_unused &&
|
||||
MyPState->ring_last <= ring_index);
|
||||
@@ -2188,10 +2175,10 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer)
|
||||
neon_request_lsns request_lsns, char *buffer)
|
||||
#else
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer)
|
||||
neon_request_lsns request_lsns, void *buffer)
|
||||
#endif
|
||||
{
|
||||
NeonResponse *resp;
|
||||
@@ -2223,7 +2210,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
* value of the LwLsn cache when the entry is not found.
|
||||
*/
|
||||
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
|
||||
XLogWaitForReplayOf(request_lsn);
|
||||
XLogWaitForReplayOf(request_lsns.request_lsn);
|
||||
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
@@ -2234,7 +2221,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (entry != NULL)
|
||||
{
|
||||
slot = entry->slot;
|
||||
if (neon_prefetch_response_usable(request_lsn, not_modified_since, slot))
|
||||
if (neon_prefetch_response_usable(request_lsns, slot))
|
||||
{
|
||||
ring_index = slot->my_ring_index;
|
||||
pgBufferUsage.prefetch.hits += 1;
|
||||
@@ -2268,8 +2255,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
pgBufferUsage.prefetch.misses += 1;
|
||||
|
||||
ring_index = prefetch_register_buffer(buftag, &request_lsn,
|
||||
¬_modified_since);
|
||||
ring_index = prefetch_register_buffer(buftag, &request_lsns);
|
||||
slot = GetPrfSlot(ring_index);
|
||||
}
|
||||
else
|
||||
@@ -2310,7 +2296,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
slot->shard_no, blkno,
|
||||
RelFileInfoFmt(rinfo),
|
||||
forkNum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
LSN_FORMAT_ARGS(request_lsns.request_lsn)),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
@@ -2333,8 +2319,7 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer
|
||||
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
|
||||
#endif
|
||||
{
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
neon_request_lsns request_lsns;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -2359,9 +2344,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
return;
|
||||
}
|
||||
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, blkno,
|
||||
&request_lsn, ¬_modified_since);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, not_modified_since, buffer);
|
||||
request_lsns = neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -2530,8 +2514,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
BlockNumber n_blocks;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
neon_request_lsns request_lsns;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -2558,13 +2541,12 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
return n_blocks;
|
||||
}
|
||||
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
request_lsns = neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
{
|
||||
NeonNblocksRequest request = {
|
||||
.req.tag = T_NeonNblocksRequest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.req.lsn = request_lsns.request_lsn,
|
||||
.req.not_modified_since = request_lsns.not_modified_since,
|
||||
.rinfo = InfoFromSMgrRel(reln),
|
||||
.forknum = forknum,
|
||||
};
|
||||
@@ -2584,7 +2566,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
LSN_FORMAT_ARGS(request_lsns.request_lsn)),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
@@ -2595,10 +2577,10 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
|
||||
|
||||
neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn,
|
||||
n_blocks);
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)),
|
||||
forknum,
|
||||
LSN_FORMAT_ARGS(request_lsns.request_lsn),
|
||||
n_blocks);
|
||||
|
||||
pfree(resp);
|
||||
return n_blocks;
|
||||
@@ -2612,17 +2594,15 @@ neon_dbsize(Oid dbNode)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
int64 db_size;
|
||||
XLogRecPtr request_lsn,
|
||||
not_modified_since;
|
||||
neon_request_lsns request_lsns;
|
||||
NRelFileInfo dummy_node = {0};
|
||||
|
||||
neon_get_request_lsn(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
request_lsns = neon_get_request_lsns(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
{
|
||||
NeonDbSizeRequest request = {
|
||||
.req.tag = T_NeonDbSizeRequest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.req.lsn = request_lsns.request_lsn,
|
||||
.req.not_modified_since = request_lsns.not_modified_since,
|
||||
.dbNode = dbNode,
|
||||
};
|
||||
|
||||
@@ -2639,8 +2619,7 @@ neon_dbsize(Oid dbNode)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X",
|
||||
dbNode,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn),
|
||||
dbNode, LSN_FORMAT_ARGS(request_lsns.request_lsn)),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
@@ -2650,9 +2629,7 @@ neon_dbsize(Oid dbNode)
|
||||
}
|
||||
|
||||
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
|
||||
dbNode,
|
||||
(uint32) (request_lsn >> 32), (uint32) request_lsn,
|
||||
db_size);
|
||||
dbNode, LSN_FORMAT_ARGS(request_lsns.request_lsn), db_size);
|
||||
|
||||
pfree(resp);
|
||||
return db_size;
|
||||
|
||||
@@ -48,10 +48,10 @@ PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
*/
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
|
||||
neon_request_lsns request_lsns, char *buffer);
|
||||
#else
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
|
||||
neon_request_lsns request_lsns, void *buffer);
|
||||
#endif
|
||||
|
||||
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
|
||||
@@ -298,9 +298,7 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
|
||||
text *relname;
|
||||
text *forkname;
|
||||
uint32 blkno;
|
||||
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
neon_request_lsns request_lsns;
|
||||
|
||||
if (PG_NARGS() != 5)
|
||||
elog(ERROR, "unexpected number of arguments in SQL function signature");
|
||||
@@ -312,8 +310,8 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
|
||||
forkname = PG_GETARG_TEXT_PP(1);
|
||||
blkno = PG_GETARG_UINT32(2);
|
||||
|
||||
request_lsn = PG_ARGISNULL(3) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(3);
|
||||
not_modified_since = PG_ARGISNULL(4) ? request_lsn : PG_GETARG_LSN(4);
|
||||
request_lsns.request_lsn = PG_ARGISNULL(3) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(3);
|
||||
request_lsns.not_modified_since = PG_ARGISNULL(4) ? request_lsns.request_lsn : PG_GETARG_LSN(4);
|
||||
|
||||
if (!superuser())
|
||||
ereport(ERROR,
|
||||
@@ -367,7 +365,8 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
|
||||
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
|
||||
raw_page_data = VARDATA(raw_page);
|
||||
|
||||
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, request_lsn, not_modified_since, raw_page_data);
|
||||
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, request_lsns,
|
||||
raw_page_data);
|
||||
|
||||
relation_close(rel, AccessShareLock);
|
||||
|
||||
@@ -413,19 +412,18 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
|
||||
|
||||
ForkNumber forknum = PG_GETARG_UINT32(3);
|
||||
uint32 blkno = PG_GETARG_UINT32(4);
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
neon_request_lsns request_lsns;
|
||||
|
||||
/* Initialize buffer to copy to */
|
||||
bytea *raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
|
||||
|
||||
request_lsn = PG_ARGISNULL(5) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(5);
|
||||
not_modified_since = PG_ARGISNULL(6) ? request_lsn : PG_GETARG_LSN(6);
|
||||
request_lsns.request_lsn = PG_ARGISNULL(5) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(5);
|
||||
request_lsns.not_modified_since = PG_ARGISNULL(6) ? request_lsns.request_lsn : PG_GETARG_LSN(6);
|
||||
|
||||
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
|
||||
raw_page_data = VARDATA(raw_page);
|
||||
|
||||
neon_read_at_lsn(rinfo, forknum, blkno, request_lsn, not_modified_since, raw_page_data);
|
||||
neon_read_at_lsn(rinfo, forknum, blkno, request_lsns, raw_page_data);
|
||||
PG_RETURN_BYTEA_P(raw_page);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user