mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
Track not_modified_since, client-support for protocol V2
This commit is contained in:
@@ -49,6 +49,8 @@ char *neon_auth_token;
|
||||
int readahead_buffer_size = 128;
|
||||
int flush_every_n_requests = 8;
|
||||
|
||||
int neon_protocol_version;
|
||||
|
||||
static int n_reconnect_attempts = 0;
|
||||
static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
@@ -844,6 +846,14 @@ pg_init_libpagestore(void)
|
||||
PGC_USERSET,
|
||||
0, /* no flags required */
|
||||
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
|
||||
DefineCustomIntVariable("neon.protocol_version",
|
||||
"Version of compute<->page server protocol",
|
||||
NULL,
|
||||
&neon_protocol_version,
|
||||
NEON_PROTOCOL_VERSION, 1, 2,
|
||||
PGC_USERSET,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
relsize_hash_init();
|
||||
|
||||
|
||||
@@ -28,6 +28,13 @@
|
||||
#define MAX_SHARDS 128
|
||||
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
|
||||
|
||||
/*
|
||||
* Currently, the protocol version is not sent to the server.
|
||||
* So it is critical that format of existing commands is not changed.
|
||||
* New protocol versions can just add new commands.
|
||||
*/
|
||||
#define NEON_PROTOCOL_VERSION 2
|
||||
|
||||
typedef enum
|
||||
{
|
||||
/* pagestore_client -> pagestore */
|
||||
@@ -37,6 +44,12 @@ typedef enum
|
||||
T_NeonDbSizeRequest,
|
||||
T_NeonGetSlruSegmentRequest,
|
||||
|
||||
T_NeonExistsV2Request = 10, /* new protocol message tags start from 10 */
|
||||
T_NeonNblocksV2Request,
|
||||
T_NeonGetPageV2Request,
|
||||
T_NeonDbSizeV2Request,
|
||||
T_NeonGetSlruSegmentV2Request,
|
||||
|
||||
/* pagestore -> pagestore_client */
|
||||
T_NeonExistsResponse = 100,
|
||||
T_NeonNblocksResponse,
|
||||
@@ -69,18 +82,33 @@ typedef enum {
|
||||
SLRU_MULTIXACT_OFFSETS
|
||||
} SlruKind;
|
||||
|
||||
/*
|
||||
* supertype of all the Neon*Request structs below
|
||||
/*--
|
||||
* supertype of all the Neon*Request structs below.
|
||||
*
|
||||
* If 'latest' is true, we are requesting the latest page version, and 'lsn'
|
||||
* is just a hint to the server that we know there are no versions of the page
|
||||
* (or relation size, for exists/nblocks requests) later than the 'lsn'.
|
||||
* All requests contain two LSNs:
|
||||
*
|
||||
* lsn: request page (or relation size, etc) at this LSN
|
||||
* not_modified_since: Hint that the page hasn't been modified between
|
||||
* this LSN and the request LSN (`lsn`).
|
||||
*
|
||||
* To request the latest version of a page, you can use MAX_LSN as the request
|
||||
* LSN.
|
||||
*
|
||||
* If you don't know any better, you can always set 'not_modified_since' equal
|
||||
* to 'lsn', but providing a lower value can speed up processing the request
|
||||
* in the pageserver, as it doesn't need to wait for the WAL to arrive, and it
|
||||
* can skip traversing through recent layers which we know to not contain any
|
||||
* versions for the requested page.
|
||||
*
|
||||
* These structs describe the V2 of these requests. The old V1 protocol contained
|
||||
* just one LSN and a boolean 'latest' flag. If the neon_protocol_version GUC is
|
||||
* set to 1, we will convert these to the V1 requests before sending.
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
bool latest; /* if true, request latest page version */
|
||||
XLogRecPtr lsn; /* request page version @ this LSN */
|
||||
XLogRecPtr lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
} NeonRequest;
|
||||
|
||||
typedef struct
|
||||
@@ -193,6 +221,7 @@ extern int readahead_buffer_size;
|
||||
extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
@@ -225,14 +254,14 @@ extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
|
||||
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
char *buffer);
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer);
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
|
||||
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||
#else
|
||||
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
void *buffer);
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, void *buffer);
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
|
||||
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, const void *buffer, bool skipFsync);
|
||||
#endif
|
||||
|
||||
@@ -168,8 +168,8 @@ typedef enum PrefetchStatus
|
||||
typedef struct PrefetchRequest
|
||||
{
|
||||
BufferTag buftag; /* must be first entry in the struct */
|
||||
XLogRecPtr effective_request_lsn;
|
||||
XLogRecPtr actual_request_lsn;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
NeonResponse *response; /* may be null */
|
||||
PrefetchStatus status;
|
||||
shardno_t shard_no;
|
||||
@@ -269,19 +269,17 @@ static PrefetchState *MyPState;
|
||||
) \
|
||||
)
|
||||
|
||||
static XLogRecPtr prefetch_lsn = 0;
|
||||
|
||||
static bool compact_prefetch_buffers(void);
|
||||
static void consume_prefetch_responses(void);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
|
||||
static bool prefetch_read(PrefetchRequest *slot);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
|
||||
static bool prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup_trailing_unused(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index);
|
||||
|
||||
static XLogRecPtr neon_get_request_lsn(bool *latest, NRelFileInfo rinfo,
|
||||
ForkNumber forknum, BlockNumber blkno);
|
||||
static void neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since);
|
||||
|
||||
static bool
|
||||
compact_prefetch_buffers(void)
|
||||
@@ -338,8 +336,8 @@ compact_prefetch_buffers(void)
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->effective_request_lsn = source_slot->effective_request_lsn;
|
||||
target_slot->actual_request_lsn = source_slot->actual_request_lsn;
|
||||
target_slot->request_lsn = source_slot->request_lsn;
|
||||
target_slot->not_modified_since = source_slot->not_modified_since;
|
||||
target_slot->my_ring_index = empty_ring_index;
|
||||
|
||||
prfh_delete(MyPState->prf_hash, source_slot);
|
||||
@@ -358,7 +356,8 @@ compact_prefetch_buffers(void)
|
||||
};
|
||||
source_slot->response = NULL;
|
||||
source_slot->my_ring_index = 0;
|
||||
source_slot->effective_request_lsn = 0;
|
||||
source_slot->request_lsn = InvalidXLogRecPtr;
|
||||
source_slot->not_modified_since = InvalidXLogRecPtr;
|
||||
|
||||
/* update bookkeeping */
|
||||
n_moved++;
|
||||
@@ -684,54 +683,35 @@ prefetch_set_unused(uint64 ring_index)
|
||||
}
|
||||
|
||||
static void
|
||||
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since)
|
||||
{
|
||||
bool found;
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = false,
|
||||
.req.lsn = 0,
|
||||
/* lsn and not_modified_since are filled in below */
|
||||
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
|
||||
.forknum = slot->buftag.forkNum,
|
||||
.blkno = slot->buftag.blockNum,
|
||||
};
|
||||
|
||||
if (force_lsn && force_latest)
|
||||
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
|
||||
|
||||
if (force_request_lsn)
|
||||
{
|
||||
request.req.lsn = *force_lsn;
|
||||
request.req.latest = *force_latest;
|
||||
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
|
||||
request.req.lsn = *force_request_lsn;
|
||||
request.req.not_modified_since = *force_not_modified_since;
|
||||
slot->request_lsn = *force_request_lsn;
|
||||
slot->not_modified_since = *force_not_modified_since;
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr lsn = neon_get_request_lsn(
|
||||
&request.req.latest,
|
||||
BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum
|
||||
);
|
||||
|
||||
/*
|
||||
* Note: effective_request_lsn is potentially higher than the
|
||||
* requested LSN, but still correct:
|
||||
*
|
||||
* We know there are no changes between the actual requested LSN and
|
||||
* the value of effective_request_lsn: If there were, the page would
|
||||
* have been in cache and evicted between those LSN values, which then
|
||||
* would have had to result in a larger request LSN for this page.
|
||||
*
|
||||
* It is possible that a concurrent backend loads the page, modifies
|
||||
* it and then evicts it again, but the LSN of that eviction cannot be
|
||||
* smaller than the current WAL insert/redo pointer, which is already
|
||||
* larger than this prefetch_lsn. So in any case, that would
|
||||
* invalidate this cache.
|
||||
*
|
||||
* The best LSN to use for effective_request_lsn would be
|
||||
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
|
||||
*/
|
||||
slot->actual_request_lsn = request.req.lsn = lsn;
|
||||
prefetch_lsn = Max(prefetch_lsn, lsn);
|
||||
slot->effective_request_lsn = prefetch_lsn;
|
||||
neon_get_request_lsn(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum,
|
||||
&request.req.lsn,
|
||||
&request.req.not_modified_since);
|
||||
slot->request_lsn = request.req.lsn;
|
||||
slot->not_modified_since = request.req.not_modified_since;
|
||||
}
|
||||
|
||||
Assert(slot->response == NULL);
|
||||
@@ -749,7 +729,6 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
|
||||
|
||||
prfh_insert(MyPState->prf_hash, slot, &found);
|
||||
Assert(!found);
|
||||
}
|
||||
@@ -759,22 +738,25 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
||||
*
|
||||
* Register that we may want the contents of BufferTag in the near future.
|
||||
*
|
||||
* If force_latest and force_lsn are not NULL, those values are sent to the
|
||||
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
|
||||
* to fill in these values manually.
|
||||
* If force_request_lsn and force_not_modified_since are not NULL, those
|
||||
* values are sent to the pageserver. If they are NULL, we utilize the
|
||||
* lastWrittenLsn -infrastructure to fill them in.
|
||||
*
|
||||
* NOTE: this function may indirectly update MyPState->pfs_hash; which
|
||||
* invalidates any active pointers into the hash table.
|
||||
*/
|
||||
|
||||
static uint64
|
||||
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn,
|
||||
XLogRecPtr *force_not_modified_since)
|
||||
{
|
||||
uint64 ring_index;
|
||||
PrefetchRequest req;
|
||||
PrefetchRequest *slot;
|
||||
PrfHashEntry *entry;
|
||||
|
||||
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
Retry:
|
||||
@@ -795,37 +777,31 @@ Retry:
|
||||
* If we want a specific lsn, we do not accept requests that were made
|
||||
* with a potentially different LSN.
|
||||
*/
|
||||
if (force_latest && force_lsn)
|
||||
if (force_request_lsn)
|
||||
{
|
||||
/*
|
||||
* if we want the latest version, any effective_request_lsn <
|
||||
* request lsn is OK
|
||||
* The not_changed_since..request_lsn range of each request is
|
||||
* effectively a claim that the page has not been modified between
|
||||
* whose LSNs. Therefore, if the range of the old request in the
|
||||
* queue overlaps with the new request, we know that the the page
|
||||
* hasn't been modified in the union of the ranges. We can reuse
|
||||
* the old request in that case.
|
||||
*
|
||||
* The new request's LSN should never be older than the old one,
|
||||
* so don't bother checking that case.
|
||||
*/
|
||||
if (*force_latest)
|
||||
if (*force_request_lsn >= slot->not_modified_since &&
|
||||
*force_not_modified_since <= slot->request_lsn)
|
||||
{
|
||||
if (*force_lsn > slot->effective_request_lsn)
|
||||
{
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
|
||||
/* the old request overlaps with the new one; keep it */
|
||||
}
|
||||
|
||||
/*
|
||||
* if we don't want the latest version, only accept requests with
|
||||
* the exact same LSN
|
||||
*/
|
||||
else
|
||||
{
|
||||
if (*force_lsn != slot->effective_request_lsn)
|
||||
{
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
/* Wait for the old request to finish and discard it */
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
entry = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -921,7 +897,7 @@ Retry:
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
|
||||
prefetch_do_request(slot, force_latest, force_lsn);
|
||||
prefetch_do_request(slot, force_request_lsn, force_not_modified_since);
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
@@ -997,7 +973,66 @@ nm_pack_request(NeonRequest *msg)
|
||||
StringInfoData s;
|
||||
|
||||
initStringInfo(&s);
|
||||
pq_sendbyte(&s, msg->tag);
|
||||
|
||||
if (neon_protocol_version >= 2)
|
||||
{
|
||||
pq_sendbyte(&s, msg->tag);
|
||||
pq_sendint64(&s, msg->lsn);
|
||||
pq_sendint64(&s, msg->not_modified_since);
|
||||
}
|
||||
else
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
bool latest;
|
||||
XLogRecPtr lsn;
|
||||
|
||||
/*
|
||||
* In primary, we always request the latest page version.
|
||||
*/
|
||||
if (!RecoveryInProgress())
|
||||
{
|
||||
latest = true;
|
||||
lsn = msg->not_modified_since;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* In the current protocol, we cannot represent that we want to read
|
||||
* page at LSN X, and we know that it hasn't been modified since Y. We
|
||||
* can either use 'not_modified_lsn' as the request LSN, and risk
|
||||
* getting an error if that LSN is too old and has already fallen out
|
||||
* of the pageserver's GC horizon, or we can send 'request_lsn',
|
||||
* causing the pageserver to possibly wait for the recent WAL to
|
||||
* arrive unnecessarily. Or something in between. We choose to use the
|
||||
* old LSN and risk GC errors, because that's what we've done
|
||||
* historically.
|
||||
*/
|
||||
latest = false;
|
||||
lsn = msg->not_modified_since;
|
||||
}
|
||||
|
||||
switch(msg->tag)
|
||||
{
|
||||
case T_NeonExistsV2Request:
|
||||
tag = T_NeonExistsRequest;
|
||||
break;
|
||||
case T_NeonNblocksV2Request:
|
||||
tag = T_NeonNblocksRequest;
|
||||
break;
|
||||
case T_NeonGetPageV2Request:
|
||||
tag = T_NeonGetPageRequest;
|
||||
break;
|
||||
case T_NeonDbSizeV2Request:
|
||||
tag = T_NeonDbSizeRequest;
|
||||
break;
|
||||
case T_NeonGetSlruSegmentV2Request:
|
||||
tag = T_NeonGetSlruSegmentRequest;
|
||||
break;
|
||||
}
|
||||
pq_sendbyte(&s, tag);
|
||||
pq_sendbyte(&s, latest);
|
||||
pq_sendint64(&s, lsn);
|
||||
}
|
||||
|
||||
switch (messageTag(msg))
|
||||
{
|
||||
@@ -1006,8 +1041,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||
@@ -1019,8 +1052,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||
@@ -1032,8 +1063,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, msg_req->dbNode);
|
||||
|
||||
break;
|
||||
@@ -1042,8 +1071,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||
@@ -1057,8 +1084,6 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendbyte(&s, msg_req->kind);
|
||||
pq_sendint32(&s, msg_req->segno);
|
||||
|
||||
@@ -1209,7 +1234,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1222,7 +1247,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1236,7 +1261,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1247,7 +1272,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
|
||||
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1259,7 +1284,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
|
||||
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"not_modified_since\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.not_modified_since));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1531,44 +1556,36 @@ nm_adjust_lsn(XLogRecPtr lsn)
|
||||
/*
|
||||
* Return LSN for requesting pages and number of blocks from page server
|
||||
*/
|
||||
static XLogRecPtr
|
||||
neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
|
||||
static void
|
||||
neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since)
|
||||
{
|
||||
XLogRecPtr lsn;
|
||||
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/*
|
||||
* We don't know if WAL has been generated but not yet replayed, so
|
||||
* we're conservative in our estimates about latest pages.
|
||||
*/
|
||||
*latest = false;
|
||||
/* Request the page at the last replayed LSN. */
|
||||
*request_lsn = GetXLogReplayRecPtr(NULL);
|
||||
*not_modified_since = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
|
||||
/*
|
||||
* Get the last written LSN of this page.
|
||||
*/
|
||||
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
|
||||
neon_log(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||
neon_log(DEBUG1, "neon_get_request_lsn request lsn %X/%X, not_modified_since %X/%X",
|
||||
LSN_FORMAT_ARGS(*request_lsn), LSN_FORMAT_ARGS(*not_modified_since));
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr last_written_lsn;
|
||||
XLogRecPtr flushlsn;
|
||||
|
||||
/*
|
||||
* Use the latest LSN that was evicted from the buffer cache. Any
|
||||
* pages modified by later WAL records must still in the buffer cache,
|
||||
* so our request cannot concern those.
|
||||
* Use the latest LSN that was evicted from the buffer cache as the
|
||||
* 'not_modified_since' hint. Any pages modified by later WAL records
|
||||
* must still in the buffer cache, so our request cannot concern
|
||||
* those.
|
||||
*/
|
||||
*latest = true;
|
||||
lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
Assert(lsn != InvalidXLogRecPtr);
|
||||
last_written_lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
|
||||
Assert(last_written_lsn != InvalidXLogRecPtr);
|
||||
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
|
||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||
LSN_FORMAT_ARGS(last_written_lsn));
|
||||
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
last_written_lsn = nm_adjust_lsn(last_written_lsn);
|
||||
|
||||
/*
|
||||
* Is it possible that the last-written LSN is ahead of last flush
|
||||
@@ -1583,16 +1600,25 @@ neon_get_request_lsn(bool *latest, NRelFileInfo rinfo, ForkNumber forknum, Block
|
||||
#else
|
||||
flushlsn = GetFlushRecPtr();
|
||||
#endif
|
||||
if (lsn > flushlsn)
|
||||
if (last_written_lsn > flushlsn)
|
||||
{
|
||||
neon_log(DEBUG5, "last-written LSN %X/%X is ahead of last flushed LSN %X/%X",
|
||||
(uint32) (lsn >> 32), (uint32) lsn,
|
||||
(uint32) (flushlsn >> 32), (uint32) flushlsn);
|
||||
XLogFlush(lsn);
|
||||
LSN_FORMAT_ARGS(last_written_lsn),
|
||||
LSN_FORMAT_ARGS(flushlsn));
|
||||
XLogFlush(last_written_lsn);
|
||||
flushlsn = last_written_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
return lsn;
|
||||
/*
|
||||
* Request the latest version of the page. The most up-to-date request
|
||||
* LSN we could use would be the current insert LSN, but to avoid the
|
||||
* overhead of looking it up, use 'flushlsn' instead. This relies on the
|
||||
* assumption that if the page was modified since the last WAL flush, it
|
||||
* should still be in the buffer cache, and we wouldn't be requesting it.
|
||||
*/
|
||||
*request_lsn = flushlsn;
|
||||
*not_modified_since = last_written_lsn;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1604,8 +1630,8 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
bool exists;
|
||||
NeonResponse *resp;
|
||||
BlockNumber n_blocks;
|
||||
bool latest;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -1660,12 +1686,13 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
return false;
|
||||
}
|
||||
|
||||
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
{
|
||||
NeonExistsRequest request = {
|
||||
.req.tag = T_NeonExistsRequest,
|
||||
.req.latest = latest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.rinfo = InfoFromSMgrRel(reln),
|
||||
.forknum = forkNum};
|
||||
|
||||
@@ -2102,10 +2129,10 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer)
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer)
|
||||
#else
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, void *buffer)
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer)
|
||||
#endif
|
||||
{
|
||||
NeonResponse *resp;
|
||||
@@ -2148,15 +2175,28 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (entry != NULL)
|
||||
{
|
||||
slot = entry->slot;
|
||||
if (slot->effective_request_lsn >= request_lsn)
|
||||
/*
|
||||
* The not_changed_since..request_lsn range of each request is
|
||||
* effectively a claim that the page has not been modified between
|
||||
* those LSNs. Therefore, if the range of the old request in the queue
|
||||
* overlaps with the new request, we know that the the page hasn't
|
||||
* been modified in the union of the ranges. We can reuse the old
|
||||
* request in that case.
|
||||
*
|
||||
* The new request's LSN should never be older than the old one,
|
||||
* so don't bother checking that case.
|
||||
*/
|
||||
if (request_lsn >= slot->not_modified_since &&
|
||||
not_modified_since <= slot->request_lsn)
|
||||
{
|
||||
ring_index = slot->my_ring_index;
|
||||
pgBufferUsage.prefetch.hits += 1;
|
||||
}
|
||||
else /* the current prefetch LSN is not large
|
||||
* enough, so drop the prefetch */
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Cannot use this prefetch, discard it
|
||||
*
|
||||
* We can't drop cache for not-yet-received requested items. It is
|
||||
* unlikely this happens, but it can happen if prefetch distance
|
||||
* is large enough and a backend didn't consume all prefetch
|
||||
@@ -2181,8 +2221,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
pgBufferUsage.prefetch.misses += 1;
|
||||
|
||||
ring_index = prefetch_register_buffer(buftag, &request_latest,
|
||||
&request_lsn);
|
||||
ring_index = prefetch_register_buffer(buftag, &request_lsn,
|
||||
¬_modified_since);
|
||||
slot = GetPrfSlot(ring_index);
|
||||
}
|
||||
else
|
||||
@@ -2246,8 +2286,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer
|
||||
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
|
||||
#endif
|
||||
{
|
||||
bool latest;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -2272,8 +2312,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
return;
|
||||
}
|
||||
|
||||
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, blkno);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, latest, buffer);
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, blkno,
|
||||
&request_lsn, ¬_modified_since);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, not_modified_since, buffer);
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
|
||||
@@ -2442,8 +2483,8 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
BlockNumber n_blocks;
|
||||
bool latest;
|
||||
XLogRecPtr request_lsn;
|
||||
XLogRecPtr not_modified_since;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -2470,12 +2511,13 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
return n_blocks;
|
||||
}
|
||||
|
||||
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
neon_get_request_lsn(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
{
|
||||
NeonNblocksRequest request = {
|
||||
.req.tag = T_NeonNblocksRequest,
|
||||
.req.latest = latest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.rinfo = InfoFromSMgrRel(reln),
|
||||
.forknum = forknum,
|
||||
};
|
||||
@@ -2523,16 +2565,17 @@ neon_dbsize(Oid dbNode)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
int64 db_size;
|
||||
XLogRecPtr request_lsn;
|
||||
bool latest;
|
||||
XLogRecPtr request_lsn,
|
||||
not_modified_since;
|
||||
NRelFileInfo dummy_node = {0};
|
||||
|
||||
request_lsn = neon_get_request_lsn(&latest, dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO);
|
||||
neon_get_request_lsn(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO,
|
||||
&request_lsn, ¬_modified_since);
|
||||
{
|
||||
NeonDbSizeRequest request = {
|
||||
.req.tag = T_NeonDbSizeRequest,
|
||||
.req.latest = latest,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
.dbNode = dbNode,
|
||||
};
|
||||
|
||||
@@ -2605,7 +2648,6 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||
* the most recently inserted WAL record's LSN.
|
||||
*/
|
||||
lsn = GetXLogInsertRecPtr();
|
||||
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
|
||||
/*
|
||||
@@ -2805,14 +2847,23 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
static int
|
||||
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer)
|
||||
{
|
||||
XLogRecPtr request_lsn;
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of basebackup.
|
||||
* We need to download SLRU segments only once after node startup,
|
||||
* then SLRUs are maintained locally.
|
||||
*/
|
||||
request_lsn = GetRedoStartLsn();
|
||||
XLogRecPtr request_lsn,
|
||||
not_modified_since;
|
||||
|
||||
if (RecoveryInProgress())
|
||||
request_lsn = GetXLogReplayRecPtr(NULL);
|
||||
else
|
||||
request_lsn = GetXLogInsertRecPtr();
|
||||
request_lsn = nm_adjust_lsn(request_lsn);
|
||||
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
|
||||
* segment has not changed since the basebackup, because in order to
|
||||
* modify it, we would have had to download it already. And once
|
||||
* downloaded, we never evict SLRU segments from local disk.
|
||||
*/
|
||||
not_modified_since = GetRedoStartLsn();
|
||||
|
||||
SlruKind kind;
|
||||
|
||||
if (STRPREFIX(path, "pg_xact"))
|
||||
@@ -2827,8 +2878,8 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
NeonResponse *resp;
|
||||
NeonGetSlruSegmentRequest request = {
|
||||
.req.tag = T_NeonGetSlruSegmentRequest,
|
||||
.req.latest = false,
|
||||
.req.lsn = request_lsn,
|
||||
.req.not_modified_since = not_modified_since,
|
||||
|
||||
.kind = kind,
|
||||
.segno = segno
|
||||
@@ -2956,6 +3007,9 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
{
|
||||
BlockNumber relsize;
|
||||
|
||||
/* This is only used in WAL replay */
|
||||
Assert(RecoveryInProgress());
|
||||
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rinfo, forknum, &relsize))
|
||||
{
|
||||
@@ -2974,13 +3028,12 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
* This length is later reused when we open the smgr to read the
|
||||
* block, which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.not_modified_since = end_recptr,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
|
||||
@@ -48,10 +48,10 @@ PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
*/
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer);
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
|
||||
#else
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, void *buffer);
|
||||
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
|
||||
#endif
|
||||
|
||||
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
|
||||
|
||||
Reference in New Issue
Block a user