Changes of neon extension to support local prefetch (#2369)

* Changes of neon extension to support local prefetch

* Catch exceptions in pageserver_receive

* Bump posgres version

* Bump posgres version

* Bump posgres version

* Bump posgres version
This commit is contained in:
Konstantin Knizhnik
2022-09-13 12:26:20 +03:00
committed by GitHub
parent 4f7557fb58
commit f44afbaf62
5 changed files with 233 additions and 74 deletions

View File

@@ -43,11 +43,6 @@ PGconn *pageserver_conn = NULL;
char *page_server_connstring_raw; char *page_server_connstring_raw;
static ZenithResponse *pageserver_call(ZenithRequest *request);
page_server_api api = {
.request = pageserver_call
};
static void static void
pageserver_connect() pageserver_connect()
{ {
@@ -154,60 +149,86 @@ retry:
} }
static ZenithResponse * static void
pageserver_call(ZenithRequest *request) pageserver_disconnect(void)
{
/*
* If anything goes wrong while we were sending a request, it's not
* clear what state the connection is in. For example, if we sent the
* request but didn't receive a response yet, we might receive the
* response some time later after we have already sent a new unrelated
* request. Close the connection to avoid getting confused.
*/
if (connected)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
}
}
static void
pageserver_send(ZenithRequest *request)
{ {
StringInfoData req_buff; StringInfoData req_buff;
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
}
if (!connected)
pageserver_connect();
req_buff = zm_pack_request(request);
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output
* and TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
{
char* msg = PQerrorMessage(pageserver_conn);
pageserver_disconnect();
neon_log(ERROR, "failed to send page request: %s", msg);
}
pfree(req_buff.data);
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = zm_to_string((ZenithMessage *) request);
neon_log(PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}
}
static ZenithResponse *
pageserver_receive(void)
{
StringInfoData resp_buff; StringInfoData resp_buff;
ZenithResponse *resp; ZenithResponse *resp;
PG_TRY(); PG_TRY();
{ {
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
}
if (!connected)
pageserver_connect();
req_buff = zm_pack_request(request);
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output
* and TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0 || PQflush(pageserver_conn))
{
neon_log(ERROR, "failed to send page request: %s",
PQerrorMessage(pageserver_conn));
}
pfree(req_buff.data);
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = zm_to_string((ZenithMessage *) request);
neon_log(PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}
/* read response */ /* read response */
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data); resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.cursor = 0; resp_buff.cursor = 0;
if (resp_buff.len == -1) if (resp_buff.len < 0)
neon_log(ERROR, "end of COPY"); {
else if (resp_buff.len == -2) if (resp_buff.len == -1)
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn)); neon_log(ERROR, "end of COPY");
else if (resp_buff.len == -2)
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
}
resp = zm_unpack_response(&resp_buff); resp = zm_unpack_response(&resp_buff);
PQfreemem(resp_buff.data); PQfreemem(resp_buff.data);
@@ -221,20 +242,7 @@ pageserver_call(ZenithRequest *request)
} }
PG_CATCH(); PG_CATCH();
{ {
/* pageserver_disconnect();
* If anything goes wrong while we were sending a request, it's not
* clear what state the connection is in. For example, if we sent the
* request but didn't receive a response yet, we might receive the
* response some time later after we have already sent a new unrelated
* request. Close the connection to avoid getting confused.
*/
if (connected)
{
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
}
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
@@ -243,6 +251,32 @@ pageserver_call(ZenithRequest *request)
} }
static void
pageserver_flush(void)
{
if (PQflush(pageserver_conn))
{
char* msg = PQerrorMessage(pageserver_conn);
pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg);
}
}
static ZenithResponse *
pageserver_call(ZenithRequest* request)
{
pageserver_send(request);
pageserver_flush();
return pageserver_receive();
}
page_server_api api = {
.request = pageserver_call,
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
};
static bool static bool
check_zenith_id(char **newval, void **extra, GucSource source) check_zenith_id(char **newval, void **extra, GucSource source)
{ {

View File

@@ -142,7 +142,10 @@ extern char *zm_to_string(ZenithMessage *msg);
typedef struct typedef struct
{ {
ZenithResponse *(*request) (ZenithRequest *request); ZenithResponse *(*request) (ZenithRequest *request);
} page_server_api; void (*send) (ZenithRequest *request);
ZenithResponse *(*receive) (void);
void (*flush) (void);
} page_server_api;
extern page_server_api *page_server; extern page_server_api *page_server;
@@ -171,6 +174,7 @@ extern void zenith_extend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync); BlockNumber blocknum, char *buffer, bool skipFsync);
extern bool zenith_prefetch(SMgrRelation reln, ForkNumber forknum, extern bool zenith_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum); BlockNumber blocknum);
extern void zenith_reset_prefetch(SMgrRelation reln);
extern void zenith_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, extern void zenith_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer); char *buffer);

View File

@@ -57,6 +57,8 @@
#include "postmaster/interrupt.h" #include "postmaster/interrupt.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/relfilenode.h"
#include "storage/buf_internals.h"
#include "storage/md.h" #include "storage/md.h"
#include "fmgr.h" #include "fmgr.h"
#include "miscadmin.h" #include "miscadmin.h"
@@ -110,6 +112,49 @@ typedef enum
static SMgrRelation unlogged_build_rel = NULL; static SMgrRelation unlogged_build_rel = NULL;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
/*
* Prefetch implementation:
* Prefetch is performed locally by each backend.
* There can be up to MAX_PREFETCH_REQUESTS registered using smgr_prefetch
* before smgr_read. All this requests are appended to primary smgr_read request.
* It is assumed that pages will be requested in prefetch order.
* Reading of prefetch responses is delayed until them are actually needed (smgr_read).
* It make it possible to parallelize processing and receiving of prefetched pages.
* In case of prefetch miss or any other SMGR request other than smgr_read,
* all prefetch responses has to be consumed.
*/
#define MAX_PREFETCH_REQUESTS 128
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
int n_prefetch_requests;
int n_prefetch_responses;
int n_prefetched_buffers;
int n_prefetch_hits;
int n_prefetch_misses;
XLogRecPtr prefetch_lsn;
static void
consume_prefetch_responses(void)
{
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++) {
ZenithResponse* resp = page_server->receive();
pfree(resp);
}
n_prefetched_buffers = 0;
n_prefetch_responses = 0;
}
static ZenithResponse*
page_server_request(void const* req)
{
consume_prefetch_responses();
return page_server->request((ZenithRequest*)req);
}
StringInfoData StringInfoData
zm_pack_request(ZenithRequest *msg) zm_pack_request(ZenithRequest *msg)
{ {
@@ -735,7 +780,7 @@ zenith_exists(SMgrRelation reln, ForkNumber forkNum)
.forknum = forkNum .forknum = forkNum
}; };
resp = page_server->request((ZenithRequest *) &request); resp = page_server_request(&request);
} }
switch (resp->tag) switch (resp->tag)
@@ -948,6 +993,16 @@ zenith_close(SMgrRelation reln, ForkNumber forknum)
mdclose(reln, forknum); mdclose(reln, forknum);
} }
/*
* zenith_reset_prefetch() -- reoe all previously rgistered prefeth requests
*/
void
zenith_reset_prefetch(SMgrRelation reln)
{
n_prefetch_requests = 0;
}
/* /*
* zenith_prefetch() -- Initiate asynchronous read of the specified block of a relation * zenith_prefetch() -- Initiate asynchronous read of the specified block of a relation
*/ */
@@ -971,9 +1026,15 @@ zenith_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
} }
/* not implemented */ if (n_prefetch_requests < MAX_PREFETCH_REQUESTS)
elog(SmgrTrace, "[ZENITH_SMGR] prefetch noop"); {
return true; prefetch_requests[n_prefetch_requests].rnode = reln->smgr_rnode.node;
prefetch_requests[n_prefetch_requests].forkNum = forknum;
prefetch_requests[n_prefetch_requests].blockNum = blocknum;
n_prefetch_requests += 1;
return true;
}
return false;
} }
/* /*
@@ -1022,7 +1083,47 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno
XLogRecPtr request_lsn, bool request_latest, char *buffer) XLogRecPtr request_lsn, bool request_latest, char *buffer)
{ {
ZenithResponse *resp; ZenithResponse *resp;
int i;
/*
* Try to find prefetched page.
* It is assumed that pages will be requested in the same order as them are prefetched,
* but some other backend may load page in shared buffers, so some prefetch responses should
* be skipped.
*/
for (i = n_prefetched_buffers; i < n_prefetch_responses; i++)
{
resp = page_server->receive();
if (resp->tag == T_ZenithGetPageResponse &&
RelFileNodeEquals(prefetch_responses[i].rnode, rnode) &&
prefetch_responses[i].forkNum == forkNum &&
prefetch_responses[i].blockNum == blkno)
{
char* page = ((ZenithGetPageResponse *) resp)->page;
/*
* Check if prefetched page is still relevant.
* If it is updated by some other backend, then it should not
* be requested from smgr unless it is evicted from shared buffers.
* In the last case last_evicted_lsn should be updated and
* request_lsn should be greater than prefetch_lsn.
* Maximum with page LSN is used because page returned by page server
* may have LSN either greater either smaller than requested.
*/
if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn)
{
n_prefetched_buffers = i+1;
n_prefetch_hits += 1;
n_prefetch_requests = 0;
memcpy(buffer, page, BLCKSZ);
pfree(resp);
return;
}
}
pfree(resp);
}
n_prefetched_buffers = 0;
n_prefetch_responses = 0;
n_prefetch_misses += 1;
{ {
ZenithGetPageRequest request = { ZenithGetPageRequest request = {
.req.tag = T_ZenithGetPageRequest, .req.tag = T_ZenithGetPageRequest,
@@ -1032,10 +1133,29 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno
.forknum = forkNum, .forknum = forkNum,
.blkno = blkno .blkno = blkno
}; };
if (n_prefetch_requests > 0)
resp = page_server->request((ZenithRequest *) &request); {
/* Combine all prefetch requests with primary request */
page_server->send((ZenithRequest *) &request);
for (i = 0; i < n_prefetch_requests; i++)
{
request.rnode = prefetch_requests[i].rnode;
request.forknum = prefetch_requests[i].forkNum;
request.blkno = prefetch_requests[i].blockNum;
prefetch_responses[i] = prefetch_requests[i];
page_server->send((ZenithRequest *) &request);
}
page_server->flush();
n_prefetch_responses = n_prefetch_requests;
n_prefetch_requests = 0;
prefetch_lsn = request_lsn;
resp = page_server->receive();
}
else
{
resp = page_server->request((ZenithRequest *) &request);
}
} }
switch (resp->tag) switch (resp->tag)
{ {
case T_ZenithGetPageResponse: case T_ZenithGetPageResponse:
@@ -1305,7 +1425,7 @@ zenith_nblocks(SMgrRelation reln, ForkNumber forknum)
.forknum = forknum, .forknum = forknum,
}; };
resp = page_server->request((ZenithRequest *) &request); resp = page_server_request(&request);
} }
switch (resp->tag) switch (resp->tag)
@@ -1365,7 +1485,7 @@ zenith_dbsize(Oid dbNode)
.dbNode = dbNode, .dbNode = dbNode,
}; };
resp = page_server->request((ZenithRequest *) &request); resp = page_server_request(&request);
} }
switch (resp->tag) switch (resp->tag)
@@ -1680,6 +1800,7 @@ static const struct f_smgr zenith_smgr =
.smgr_unlink = zenith_unlink, .smgr_unlink = zenith_unlink,
.smgr_extend = zenith_extend, .smgr_extend = zenith_extend,
.smgr_prefetch = zenith_prefetch, .smgr_prefetch = zenith_prefetch,
.smgr_reset_prefetch = zenith_reset_prefetch,
.smgr_read = zenith_read, .smgr_read = zenith_read,
.smgr_write = zenith_write, .smgr_write = zenith_write,
.smgr_writeback = zenith_writeback, .smgr_writeback = zenith_writeback,