|
|
|
|
@@ -168,8 +168,8 @@ typedef enum PrefetchStatus
|
|
|
|
|
typedef struct PrefetchRequest
|
|
|
|
|
{
|
|
|
|
|
BufferTag buftag; /* must be first entry in the struct */
|
|
|
|
|
XLogRecPtr effective_request_lsn;
|
|
|
|
|
XLogRecPtr actual_request_lsn;
|
|
|
|
|
XLogRecPtr request_lsn;
|
|
|
|
|
XLogRecPtr not_modified_since;
|
|
|
|
|
NeonResponse *response; /* may be null */
|
|
|
|
|
PrefetchStatus status;
|
|
|
|
|
shardno_t shard_no;
|
|
|
|
|
@@ -269,19 +269,17 @@ static PrefetchState *MyPState;
|
|
|
|
|
) \
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
static XLogRecPtr prefetch_lsn = 0;
|
|
|
|
|
|
|
|
|
|
static bool compact_prefetch_buffers(void);
|
|
|
|
|
static void consume_prefetch_responses(void);
|
|
|
|
|
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
|
|
|
|
|
static uint64 prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
|
|
|
|
|
static bool prefetch_read(PrefetchRequest *slot);
|
|
|
|
|
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
|
|
|
|
|
static void prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
|
|
|
|
|
static bool prefetch_wait_for(uint64 ring_index);
|
|
|
|
|
static void prefetch_cleanup_trailing_unused(void);
|
|
|
|
|
static inline void prefetch_set_unused(uint64 ring_index);
|
|
|
|
|
|
|
|
|
|
static XLogRecPtr neon_get_request_lsn(bool *latest, NRelFileInfo rinfo,
|
|
|
|
|
ForkNumber forknum, BlockNumber blkno);
|
|
|
|
|
static void neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
|
|
|
|
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since);
|
|
|
|
|
|
|
|
|
|
static bool
|
|
|
|
|
compact_prefetch_buffers(void)
|
|
|
|
|
@@ -338,8 +336,8 @@ compact_prefetch_buffers(void)
|
|
|
|
|
target_slot->shard_no = source_slot->shard_no;
|
|
|
|
|
target_slot->status = source_slot->status;
|
|
|
|
|
target_slot->response = source_slot->response;
|
|
|
|
|
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
|
|
|
|
|
target_slot->actual_request_lsn = source_slot->actual_request_lsn;
|
|
|
|
|
target_slot->request_lsn = source_slot->request_lsn;
|
|
|
|
|
target_slot->not_modified_since = source_slot->not_modified_since;
|
|
|
|
|
target_slot->my_ring_index = empty_ring_index;
|
|
|
|
|
|
|
|
|
|
prfh_delete(MyPState->prf_hash, source_slot);
|
|
|
|
|
@@ -358,7 +356,8 @@ compact_prefetch_buffers(void)
|
|
|
|
|
};
|
|
|
|
|
source_slot->response = NULL;
|
|
|
|
|
source_slot->my_ring_index = 0;
|
|
|
|
|
source_slot->effective_request_lsn = 0;
|
|
|
|
|
source_slot->request_lsn = InvalidXLogRecPtr;
|
|
|
|
|
source_slot->not_modified_since = InvalidXLogRecPtr;
|
|
|
|
|
|
|
|
|
|
/* update bookkeeping */
|
|
|
|
|
n_moved++;
|
|
|
|
|
@@ -684,54 +683,35 @@ prefetch_set_unused(uint64 ring_index)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
|
|
|
|
|
prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since)
|
|
|
|
|
{
|
|
|
|
|
bool found;
|
|
|
|
|
NeonGetPageRequest request = {
|
|
|
|
|
.req.tag = T_NeonGetPageRequest,
|
|
|
|
|
.req.latest = false,
|
|
|
|
|
.req.lsn = 0,
|
|
|
|
|
/* lsn and not_modified_since are filled in below */
|
|
|
|
|
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
|
|
|
|
|
.forknum = slot->buftag.forkNum,
|
|
|
|
|
.blkno = slot->buftag.blockNum,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if (force_lsn && force_latest)
|
|
|
|
|
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
|
|
|
|
|
|
|
|
|
|
if (force_request_lsn)
|
|
|
|
|
{
|
|
|
|
|
request.req.lsn = *force_lsn;
|
|
|
|
|
request.req.latest = *force_latest;
|
|
|
|
|
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
|
|
|
|
|
request.req.lsn = *force_request_lsn;
|
|
|
|
|
request.req.not_modified_since = *force_not_modified_since;
|
|
|
|
|
slot->request_lsn = *force_request_lsn;
|
|
|
|
|
slot->not_modified_since = *force_not_modified_since;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
XLogRecPtr lsn = neon_get_request_lsn(
|
|
|
|
|
&request.req.latest,
|
|
|
|
|
BufTagGetNRelFileInfo(slot->buftag),
|
|
|
|
|
slot->buftag.forkNum,
|
|
|
|
|
slot->buftag.blockNum
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Note: effective_request_lsn is potentially higher than the
|
|
|
|
|
* requested LSN, but still correct:
|
|
|
|
|
*
|
|
|
|
|
* We know there are no changes between the actual requested LSN and
|
|
|
|
|
* the value of effective_request_lsn: If there were, the page would
|
|
|
|
|
* have been in cache and evicted between those LSN values, which then
|
|
|
|
|
* would have had to result in a larger request LSN for this page.
|
|
|
|
|
*
|
|
|
|
|
* It is possible that a concurrent backend loads the page, modifies
|
|
|
|
|
* it and then evicts it again, but the LSN of that eviction cannot be
|
|
|
|
|
* smaller than the current WAL insert/redo pointer, which is already
|
|
|
|
|
* larger than this prefetch_lsn. So in any case, that would
|
|
|
|
|
* invalidate this cache.
|
|
|
|
|
*
|
|
|
|
|
* The best LSN to use for effective_request_lsn would be
|
|
|
|
|
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
|
|
|
|
|
*/
|
|
|
|
|
slot->actual_request_lsn = request.req.lsn = lsn;
|
|
|
|
|
prefetch_lsn = Max(prefetch_lsn, lsn);
|
|
|
|
|
slot->effective_request_lsn = prefetch_lsn;
|
|
|
|
|
neon_get_request_lsn(BufTagGetNRelFileInfo(slot->buftag),
|
|
|
|
|
slot->buftag.forkNum,
|
|
|
|
|
slot->buftag.blockNum,
|
|
|
|
|
&request.req.lsn,
|
|
|
|
|
&request.req.not_modified_since);
|
|
|
|
|
slot->request_lsn = request.req.lsn;
|
|
|
|
|
slot->not_modified_since = request.req.not_modified_since;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Assert(slot->response == NULL);
|
|
|
|
|
@@ -749,7 +729,6 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
|
|
|
|
/* update slot state */
|
|
|
|
|
slot->status = PRFS_REQUESTED;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
prfh_insert(MyPState->prf_hash, slot, &found);
|
|
|
|
|
Assert(!found);
|
|
|
|
|
}
|
|
|
|
|
@@ -759,22 +738,25 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
|
|
|
|
*
|
|
|
|
|
* Register that we may want the contents of BufferTag in the near future.
|
|
|
|
|
*
|
|
|
|
|
* If force_latest and force_lsn are not NULL, those values are sent to the
|
|
|
|
|
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
|
|
|
|
|
* to fill in these values manually.
|
|
|
|
|
* If force_request_lsn and force_not_modified_since are not NULL, those
|
|
|
|
|
* values are sent to the pageserver. If they are NULL, we utilize the
|
|
|
|
|
* lastWrittenLsn -infrastructure to fill them in.
|
|
|
|
|
*
|
|
|
|
|
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
|
|
|
|
* invalidates any active pointers into the hash table.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
static uint64
|
|
|
|
|
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
|
|
|
|
|
prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn,
|
|
|
|
|
XLogRecPtr *force_not_modified_since)
|
|
|
|
|
{
|
|
|
|
|
uint64 ring_index;
|
|
|
|
|
PrefetchRequest req;
|
|
|
|
|
PrefetchRequest *slot;
|
|
|
|
|
PrfHashEntry *entry;
|
|
|
|
|
|
|
|
|
|
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
|
|
|
|
|
|
|
|
|
|
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
|
|
|
|
req.buftag = tag;
|
|
|
|
|
Retry:
|
|
|
|
|
@@ -795,37 +777,31 @@ Retry:
|
|
|
|
|
* If we want a specific lsn, we do not accept requests that were made
|
|
|
|
|
* with a potentially different LSN.
|
|
|
|
|
*/
|
|
|
|
|
if (force_latest && force_lsn)
|
|
|
|
|
if (force_request_lsn)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* if we want the latest version, any effective_request_lsn <
|
|
|
|
|
* request lsn is OK
|
|
|
|
|
* The not_changed_since..request_lsn range of each request is
|
|
|
|
|
* effectively a claim that the page has not been modified between
|
|
|
|
|
* whose LSNs. Therefore, if the range of the old request in the
|
|
|
|
|
* queue overlaps with the new request, we know that the the page
|
|
|
|
|
* hasn't been modified in the union of the ranges. We can reuse
|
|
|
|
|
* the old request in that case.
|
|
|
|
|
*
|
|
|
|
|
* The new request's LSN should never be older than the old one,
|
|
|
|
|
* so don't bother checking that case.
|
|
|
|
|
*/
|
|
|
|
|
if (*force_latest)
|
|
|
|
|
if (*force_request_lsn >= slot->not_modified_since &&
|
|
|
|
|
*force_not_modified_since <= slot->request_lsn)
|
|
|
|
|
{
|
|
|
|
|
if (*force_lsn > slot->effective_request_lsn)
|
|
|
|
|
{
|
|
|
|
|
if (!prefetch_wait_for(ring_index))
|
|
|
|
|
goto Retry;
|
|
|
|
|
prefetch_set_unused(ring_index);
|
|
|
|
|
entry = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* the old request overlaps with the new one; keep it */
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* if we don't want the latest version, only accept requests with
|
|
|
|
|
* the exact same LSN
|
|
|
|
|
*/
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
if (*force_lsn != slot->effective_request_lsn)
|
|
|
|
|
{
|
|
|
|
|
if (!prefetch_wait_for(ring_index))
|
|
|
|
|
goto Retry;
|
|
|
|
|
prefetch_set_unused(ring_index);
|
|
|
|
|
entry = NULL;
|
|
|
|
|
}
|
|
|
|
|
/* Wait for the old request to finish and discard it */
|
|
|
|
|
if (!prefetch_wait_for(ring_index))
|
|
|
|
|
goto Retry;
|
|
|
|
|
prefetch_set_unused(ring_index);
|
|
|
|
|
entry = NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -921,7 +897,7 @@ Retry:
|
|
|
|
|
slot->shard_no = get_shard_number(&tag);
|
|
|
|
|
slot->my_ring_index = ring_index;
|
|
|
|
|
|
|
|
|
|
prefetch_do_request(slot, force_latest, force_lsn);
|
|
|
|
|
prefetch_do_request(slot, force_request_lsn, force_not_modified_since);
|
|
|
|
|
Assert(slot->status == PRFS_REQUESTED);
|
|
|
|
|
Assert(MyPState->ring_last <= ring_index &&
|
|
|
|
|
ring_index < MyPState->ring_unused);
|
|
|
|
|
@@ -997,7 +973,66 @@ nm_pack_request(NeonRequest *msg)
|
|
|
|
|
StringInfoData s;
|
|
|
|
|
|
|
|
|
|
initStringInfo(&s);
|
|
|
|
|
pq_sendbyte(&s, msg->tag);
|
|
|
|
|
|
|
|
|
|
if (neon_protocol_version >= 2)
|
|
|
|
|
{
|
|
|
|
|
pq_sendbyte(&s, msg->tag);
|
|
|
|
|
pq_sendint64(&s, msg->lsn);
|
|
|
|
|
pq_sendint64(&s, msg->not_modified_since);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
NeonMessageTag tag;
|
|
|
|
|
bool latest;
|
|
|
|
|
XLogRecPtr lsn;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* In primary, we always request the latest page version.
|
|
|
|
|
*/
|
|
|
|
|
if (!RecoveryInProgress())
|
|
|
|
|
{
|
|
|
|
|
latest = true;
|
|
|
|
|
lsn = msg->not_modified_since;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* In the current protocol, we cannot represent that we want to read
|
|
|
|
|
* page at LSN X, and we know that it hasn't been modified since Y. We
|
|
|
|
|
* can either use 'not_modified_lsn' as the request LSN, and risk
|
|
|
|
|
* getting an error if that LSN is too old and has already fallen out
|
|
|
|
|
* of the pageserver's GC horizon, or we can send 'request_lsn',
|
|
|
|
|
* causing the pageserver to possibly wait for the recent WAL to
|
|
|
|
|
* arrive unnecessarily. Or something in between. We choose to use the
|
|
|
|
|
* old LSN and risk GC errors, because that's what we've done
|
|
|
|
|
* historically.
|
|
|
|
|
*/
|
|
|
|
|
latest = false;
|
|
|
|
|
lsn = msg->not_modified_since;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch(msg->tag)
|
|
|
|
|
{
|
|
|
|
|
case T_NeonExistsV2Request:
|
|
|
|
|
tag = T_NeonExistsRequest;
|
|
|
|
|
break;
|
|
|
|
|
case T_NeonNblocksV2Request:
|
|
|
|
|
tag = T_NeonNblocksRequest;
|
|
|
|
|
break;
|
|
|
|
|
case T_NeonGetPageV2Request:
|
|
|
|
|
tag = T_NeonGetPageRequest;
|
|
|
|
|
break;
|
|
|
|
|
case T_NeonDbSizeV2Request:
|
|
|
|
|
tag = T_NeonDbSizeRequest;
|
|
|
|
|
break;
|
|
|
|
|
case T_NeonGetSlruSegmentV2Request:
|
|
|
|
|
tag = T_NeonGetSlruSegmentRequest;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
pq_sendbyte(&s, tag);
|
|
|
|
|
pq_sendbyte(&s, latest);
|
|
|
|
|
pq_sendint64(&s, lsn);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
switch (messageTag(msg))
|
|
|
|
|
{
|
|
|
|
|
@@ -1006,8 +1041,6 @@ nm_pack_request(NeonRequest *msg)
|
|
|
|
|
{
|
|
|
|
|
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
|
|
|
|
|
|
|
|
|
|
pq_sendbyte(&s, msg_req->req.latest);
|
|
|
|
|
pq_sendint64(&s, msg_req->req.lsn);
|
|
|
|
|
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
|
|
|
|
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
|
|
|
|
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
|
|
|
|
@@ -1019,8 +1052,6 @@ nm_pack_request(NeonRequest *msg)
|
|
|
|
|
{
|
|
|
|
|
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
|
|
|
|
|
|
|
|
|
|
pq_sendbyte(&s, msg_req->req.latest);
|
|
|
|
|
pq_sendint64(&s, msg_req->req.lsn);
|
|
|
|
|
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
|
|
|
|
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
|
|
|
|
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
|
|
|
|
@@ -1032,8 +1063,6 @@ nm_pack_request(NeonRequest *msg)
|
|
|
|
|
{
|
|
|
|
|
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
|
|
|
|
|
|
|
|
|
|
pq_sendbyte(&s, msg_req->req.latest);
|
|
|
|
|
pq_sendint64(&s, msg_req->req.lsn);
|
|
|
|
|
pq_sendint32(&s, msg_req->dbNode);
|
|
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
@@ -1042,8 +1071,6 @@ nm_pack_request(NeonRequest *msg)
|
|
|
|
|
{
|
|
|
|
|
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
|
|
|
|
|
|
|
|
|
|
pq_sendbyte(&s, msg_req->req.latest);
|
|
|
|
|
pq_sendint64(&s, msg_req->req.lsn);
|
|
|
|
|
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
|
|
|
|
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
|
|
|
|
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
|
|
|
|
@@ -1057,8 +1084,6 @@ nm_pack_request(NeonRequest *msg)
|
|
|
|
|
{
|
|
|
|
|
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
|
|
|
|
|
|
|
|
|
pq_sendbyte(&s, msg_req->req.latest);
|
|
|
|
|
pq_sendint64(&s, msg_req->req.lsn);
|
|
|
|
|
pq_sendbyte(&s, msg_req->kind);
|
|
|
|
|
pq_sendint32(&s, msg_req->segno);
|
|
|
|
|
|
|
|
|
|
@@ -1209,7 +1234,7 @@ nm_to_string(NeonMessage *msg)
|
|
|
|
|
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
|
|
|
|
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
|
|
|
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
|
|
|
|
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
|
|
|
|
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
|
|
|
|
appendStringInfoChar(&s, '}');
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@@ -1222,7 +1247,7 @@ nm_to_string(NeonMessage *msg)
|
|
|
|
|
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
|
|
|
|
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
|
|
|
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
|
|
|
|
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
|
|
|
|
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
|
|
|
|
appendStringInfoChar(&s, '}');
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@@ -1236,7 +1261,7 @@ nm_to_string(NeonMessage *msg)
|
|
|
|
|
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
|
|
|
|
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
|
|
|
|
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
|
|
|
|
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
|
|
|
|
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
|
|
|
|
appendStringInfoChar(&s, '}');
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@@ -1247,7 +1272,7 @@ nm_to_string(NeonMessage *msg)
|
|
|
|
|
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
|
|
|
|
|
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
|
|
|
|
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
|
|
|
|
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
|
|
|
|
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
|
|
|
|
appendStringInfoChar(&s, '}');
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@@ -1259,7 +1284,7 @@ nm_to_string(NeonMessage *msg)
|
|
|
|
|
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
|
|
|
|
|
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
|
|
|
|
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
|
|
|
|
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
|
|
|
|
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
|
|
|
|
appendStringInfoChar(&s, '}');
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
@@ -1531,44 +1556,36 @@ nm_adjust_lsn(XLogRecPtr lsn)
|
|
|
|
|
/*
|
|
|
|
|
* Return LSN for requesting pages and number of blocks from page server
|
|
|
|
|
*/
|
|
|
|
|
static XLogRecPtr
|
|
|
|
|
neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
|
|
|
|
|
static void
|
|
|
|
|
neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
|
|
|
|
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since)
|
|
|
|
|
{
|
|
|
|
|
XLogRecPtr lsn;
|
|
|
|
|
|
|
|
|
|
if (RecoveryInProgress())
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* We don't know if WAL has been generated but not yet replayed, so
|
|
|
|
|
* we're conservative in our estimates about latest pages.
|
|
|
|
|
*/
|
|
|
|
|
*latest = false;
|
|
|
|
|
/* Request the page at the last replayed LSN. */
|
|
|
|
|
*request_lsn = GetXLogReplayRecPtr(NULL);
|
|
|
|
|
*not_modified_since = GetLastWrittenLSN(rinfo, forknum, blkno);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Get the last written LSN of this page.
|
|
|
|
|
*/
|
|
|
|
|
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
|
|
|
|
lsn = nm_adjust_lsn(lsn);
|
|
|
|
|
|
|
|
|
|
neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
|
|
|
|
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
|
|
|
|
neon_log(DEBUG1, "neon_get_request_lsn request lsn %X/%X, not_modified_since %X/%X",
|
|
|
|
|
LSN_FORMAT_ARGS(*request_lsn), LSN_FORMAT_ARGS(*not_modified_since));
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
XLogRecPtr last_written_lsn;
|
|
|
|
|
XLogRecPtr flushlsn;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Use the latest LSN that was evicted from the buffer cache. Any
|
|
|
|
|
* pages modified by later WAL records must still in the buffer cache,
|
|
|
|
|
* so our request cannot concern those.
|
|
|
|
|
* Use the latest LSN that was evicted from the buffer cache as the
|
|
|
|
|
* 'not_modified_since' hint. Any pages modified by later WAL records
|
|
|
|
|
* must still in the buffer cache, so our request cannot concern
|
|
|
|
|
* those.
|
|
|
|
|
*/
|
|
|
|
|
*latest = true;
|
|
|
|
|
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
|
|
|
|
Assert(lsn != InvalidXLogRecPtr);
|
|
|
|
|
last_written_lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
|
|
|
|
Assert(last_written_lsn != InvalidXLogRecPtr);
|
|
|
|
|
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
|
|
|
|
|
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
|
|
|
|
LSN_FORMAT_ARGS(last_written_lsn));
|
|
|
|
|
|
|
|
|
|
lsn = nm_adjust_lsn(lsn);
|
|
|
|
|
last_written_lsn = nm_adjust_lsn(last_written_lsn);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Is it possible that the last-written LSN is ahead of last flush
|
|
|
|
|
@@ -1583,16 +1600,25 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
|
|
|
|
|
#else
|
|
|
|
|
flushlsn = GetFlushRecPtr();
|
|
|
|
|
#endif
|
|
|
|
|
if (lsn > flushlsn)
|
|
|
|
|
if (last_written_lsn > flushlsn)
|
|
|
|
|
{
|
|
|
|
|
neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
|
|
|
|
|
(uint32) (lsn >> 32), (uint32) lsn,
|
|
|
|
|
(uint32) (flushlsn >> 32), (uint32) flushlsn);
|
|
|
|
|
XLogFlush(lsn);
|
|
|
|
|
LSN_FORMAT_ARGS(last_written_lsn),
|
|
|
|
|
LSN_FORMAT_ARGS(flushlsn));
|
|
|
|
|
XLogFlush(last_written_lsn);
|
|
|
|
|
flushlsn = last_written_lsn;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return lsn;
|
|
|
|
|
/*
|
|
|
|
|
* Request the latest version of the page. The most up-to-date request
|
|
|
|
|
* LSN we could use would be the current insert LSN, but to avoid the
|
|
|
|
|
* overhead of looking it up, use 'flushlsn' instead. This relies on the
|
|
|
|
|
* assumption that if the page was modified since the last WAL flush, it
|
|
|
|
|
* should still be in the buffer cache, and we wouldn't be requesting it.
|
|
|
|
|
*/
|
|
|
|
|
*request_lsn = flushlsn;
|
|
|
|
|
*not_modified_since = last_written_lsn;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -1604,8 +1630,8 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
|
|
|
|
bool exists;
|
|
|
|
|
NeonResponse *resp;
|
|
|
|
|
BlockNumber n_blocks;
|
|
|
|
|
bool latest;
|
|
|
|
|
XLogRecPtr request_lsn;
|
|
|
|
|
XLogRecPtr not_modified_since;
|
|
|
|
|
|
|
|
|
|
switch (reln->smgr_relpersistence)
|
|
|
|
|
{
|
|
|
|
|
@@ -1660,12 +1686,13 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
|
|
|
|
|
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO,
|
|
|
|
|
&request_lsn, ¬_modified_since);
|
|
|
|
|
{
|
|
|
|
|
NeonExistsRequest request = {
|
|
|
|
|
.req.tag = T_NeonExistsRequest,
|
|
|
|
|
.req.latest = latest,
|
|
|
|
|
.req.lsn = request_lsn,
|
|
|
|
|
.req.not_modified_since = not_modified_since,
|
|
|
|
|
.rinfo = InfoFromSMgrRel(reln),
|
|
|
|
|
.forknum = forkNum};
|
|
|
|
|
|
|
|
|
|
@@ -2102,10 +2129,10 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
|
|
|
|
void
|
|
|
|
|
#if PG_MAJORVERSION_NUM < 16
|
|
|
|
|
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|
|
|
|
XLogRecPtr request_lsn, bool request_latest, char *buffer)
|
|
|
|
|
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer)
|
|
|
|
|
#else
|
|
|
|
|
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|
|
|
|
XLogRecPtr request_lsn, bool request_latest, void *buffer)
|
|
|
|
|
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer)
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
NeonResponse *resp;
|
|
|
|
|
@@ -2148,15 +2175,28 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|
|
|
|
if (entry != NULL)
|
|
|
|
|
{
|
|
|
|
|
slot = entry->slot;
|
|
|
|
|
if (slot->effective_request_lsn >= request_lsn)
|
|
|
|
|
/*
|
|
|
|
|
* The not_changed_since..request_lsn range of each request is
|
|
|
|
|
* effectively a claim that the page has not been modified between
|
|
|
|
|
* those LSNs. Therefore, if the range of the old request in the queue
|
|
|
|
|
* overlaps with the new request, we know that the the page hasn't
|
|
|
|
|
* been modified in the union of the ranges. We can reuse the old
|
|
|
|
|
* request in that case.
|
|
|
|
|
*
|
|
|
|
|
* The new request's LSN should never be older than the old one,
|
|
|
|
|
* so don't bother checking that case.
|
|
|
|
|
*/
|
|
|
|
|
if (request_lsn >= slot->not_modified_since &&
|
|
|
|
|
not_modified_since <= slot->request_lsn)
|
|
|
|
|
{
|
|
|
|
|
ring_index = slot->my_ring_index;
|
|
|
|
|
pgBufferUsage.prefetch.hits += 1;
|
|
|
|
|
}
|
|
|
|
|
else /* the current prefetch LSN is not large
|
|
|
|
|
* enough, so drop the prefetch */
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* Cannot use this prefetch, discard it
|
|
|
|
|
*
|
|
|
|
|
* We can't drop cache for not-yet-received requested items. It is
|
|
|
|
|
* unlikely this happens, but it can happen if prefetch distance
|
|
|
|
|
* is large enough and a backend didn't consume all prefetch
|
|
|
|
|
@@ -2181,8 +2221,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
|
|
|
|
{
|
|
|
|
|
pgBufferUsage.prefetch.misses += 1;
|
|
|
|
|
|
|
|
|
|
ring_index = prefetch_register_buffer(buftag, &request_latest,
|
|
|
|
|
&request_lsn);
|
|
|
|
|
ring_index = prefetch_register_buffer(buftag, &request_lsn,
|
|
|
|
|
¬_modified_since);
|
|
|
|
|
slot = GetPrfSlot(ring_index);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
@@ -2246,8 +2286,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer
|
|
|
|
|
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
bool latest;
|
|
|
|
|
XLogRecPtr request_lsn;
|
|
|
|
|
XLogRecPtr not_modified_since;
|
|
|
|
|
|
|
|
|
|
switch (reln->smgr_relpersistence)
|
|
|
|
|
{
|
|
|
|
|
@@ -2272,8 +2312,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, blkno);
|
|
|
|
|
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, latest, buffer);
|
|
|
|
|
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, blkno,
|
|
|
|
|
&request_lsn, ¬_modified_since);
|
|
|
|
|
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, not_modified_since, buffer);
|
|
|
|
|
|
|
|
|
|
#ifdef DEBUG_COMPARE_LOCAL
|
|
|
|
|
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
|
|
|
|
@@ -2442,8 +2483,8 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
|
|
|
|
{
|
|
|
|
|
NeonResponse *resp;
|
|
|
|
|
BlockNumber n_blocks;
|
|
|
|
|
bool latest;
|
|
|
|
|
XLogRecPtr request_lsn;
|
|
|
|
|
XLogRecPtr not_modified_since;
|
|
|
|
|
|
|
|
|
|
switch (reln->smgr_relpersistence)
|
|
|
|
|
{
|
|
|
|
|
@@ -2470,12 +2511,13 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
|
|
|
|
return n_blocks;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO);
|
|
|
|
|
neon_get_request_lsn(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO,
|
|
|
|
|
&request_lsn, ¬_modified_since);
|
|
|
|
|
{
|
|
|
|
|
NeonNblocksRequest request = {
|
|
|
|
|
.req.tag = T_NeonNblocksRequest,
|
|
|
|
|
.req.latest = latest,
|
|
|
|
|
.req.lsn = request_lsn,
|
|
|
|
|
.req.not_modified_since = not_modified_since,
|
|
|
|
|
.rinfo = InfoFromSMgrRel(reln),
|
|
|
|
|
.forknum = forknum,
|
|
|
|
|
};
|
|
|
|
|
@@ -2523,16 +2565,17 @@ neon_dbsize(Oid dbNode)
|
|
|
|
|
{
|
|
|
|
|
NeonResponse *resp;
|
|
|
|
|
int64 db_size;
|
|
|
|
|
XLogRecPtr request_lsn;
|
|
|
|
|
bool latest;
|
|
|
|
|
XLogRecPtr request_lsn,
|
|
|
|
|
not_modified_since;
|
|
|
|
|
NRelFileInfo dummy_node = {0};
|
|
|
|
|
|
|
|
|
|
request_lsn = neon_get_request_lsn(&latest, dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO);
|
|
|
|
|
neon_get_request_lsn(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO,
|
|
|
|
|
&request_lsn, ¬_modified_since);
|
|
|
|
|
{
|
|
|
|
|
NeonDbSizeRequest request = {
|
|
|
|
|
.req.tag = T_NeonDbSizeRequest,
|
|
|
|
|
.req.latest = latest,
|
|
|
|
|
.req.lsn = request_lsn,
|
|
|
|
|
.req.not_modified_since = not_modified_since,
|
|
|
|
|
.dbNode = dbNode,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
@@ -2605,7 +2648,6 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
|
|
|
|
* the most recently inserted WAL record's LSN.
|
|
|
|
|
*/
|
|
|
|
|
lsn = GetXLogInsertRecPtr();
|
|
|
|
|
|
|
|
|
|
lsn = nm_adjust_lsn(lsn);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -2805,14 +2847,23 @@ neon_end_unlogged_build(SMgrRelation reln)
|
|
|
|
|
static int
|
|
|
|
|
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer)
|
|
|
|
|
{
|
|
|
|
|
XLogRecPtr request_lsn;
|
|
|
|
|
/*
|
|
|
|
|
* GetRedoStartLsn() returns LSN of basebackup.
|
|
|
|
|
* We need to download SLRU segments only once after node startup,
|
|
|
|
|
* then SLRUs are maintained locally.
|
|
|
|
|
*/
|
|
|
|
|
request_lsn = GetRedoStartLsn();
|
|
|
|
|
XLogRecPtr request_lsn,
|
|
|
|
|
not_modified_since;
|
|
|
|
|
|
|
|
|
|
if (RecoveryInProgress())
|
|
|
|
|
request_lsn = GetXLogReplayRecPtr(NULL);
|
|
|
|
|
else
|
|
|
|
|
request_lsn = GetXLogInsertRecPtr();
|
|
|
|
|
request_lsn = nm_adjust_lsn(request_lsn);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
|
|
|
|
|
* segment has not changed since the basebackup, because in order to
|
|
|
|
|
* modify it, we would have had to download it already. And once
|
|
|
|
|
* downloaded, we never evict SLRU segments from local disk.
|
|
|
|
|
*/
|
|
|
|
|
not_modified_since = GetRedoStartLsn();
|
|
|
|
|
|
|
|
|
|
SlruKind kind;
|
|
|
|
|
|
|
|
|
|
if (STRPREFIX(path, "pg_xact"))
|
|
|
|
|
@@ -2827,8 +2878,8 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
|
|
|
|
NeonResponse *resp;
|
|
|
|
|
NeonGetSlruSegmentRequest request = {
|
|
|
|
|
.req.tag = T_NeonGetSlruSegmentRequest,
|
|
|
|
|
.req.latest = false,
|
|
|
|
|
.req.lsn = request_lsn,
|
|
|
|
|
.req.not_modified_since = not_modified_since,
|
|
|
|
|
|
|
|
|
|
.kind = kind,
|
|
|
|
|
.segno = segno
|
|
|
|
|
@@ -2956,6 +3007,9 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
|
|
|
|
{
|
|
|
|
|
BlockNumber relsize;
|
|
|
|
|
|
|
|
|
|
/* This is only used in WAL replay */
|
|
|
|
|
Assert(RecoveryInProgress());
|
|
|
|
|
|
|
|
|
|
/* Extend the relation if we know its size */
|
|
|
|
|
if (get_cached_relsize(rinfo, forknum, &relsize))
|
|
|
|
|
{
|
|
|
|
|
@@ -2974,13 +3028,12 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
|
|
|
|
* This length is later reused when we open the smgr to read the
|
|
|
|
|
* block, which is fine and expected.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
NeonResponse *response;
|
|
|
|
|
NeonNblocksResponse *nbresponse;
|
|
|
|
|
NeonNblocksRequest request = {
|
|
|
|
|
.req = (NeonRequest) {
|
|
|
|
|
.lsn = end_recptr,
|
|
|
|
|
.latest = false,
|
|
|
|
|
.not_modified_since = end_recptr,
|
|
|
|
|
.tag = T_NeonNblocksRequest,
|
|
|
|
|
},
|
|
|
|
|
.rinfo = rinfo,
|
|
|
|
|
|