mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 14:19:58 +00:00
Changes of neon extension to support local prefetch (#2369)
* Changes of neon extension to support local prefetch * Catch exceptions in pageserver_receive * Bump posgres version * Bump posgres version * Bump posgres version * Bump posgres version
This commit is contained in:
committed by
GitHub
parent
4f7557fb58
commit
f44afbaf62
@@ -43,11 +43,6 @@ PGconn *pageserver_conn = NULL;
|
||||
|
||||
char *page_server_connstring_raw;
|
||||
|
||||
static ZenithResponse *pageserver_call(ZenithRequest *request);
|
||||
page_server_api api = {
|
||||
.request = pageserver_call
|
||||
};
|
||||
|
||||
static void
|
||||
pageserver_connect()
|
||||
{
|
||||
@@ -154,60 +149,86 @@ retry:
|
||||
}
|
||||
|
||||
|
||||
static ZenithResponse *
|
||||
pageserver_call(ZenithRequest *request)
|
||||
static void
|
||||
pageserver_disconnect(void)
|
||||
{
|
||||
/*
|
||||
* If anything goes wrong while we were sending a request, it's not
|
||||
* clear what state the connection is in. For example, if we sent the
|
||||
* request but didn't receive a response yet, we might receive the
|
||||
* response some time later after we have already sent a new unrelated
|
||||
* request. Close the connection to avoid getting confused.
|
||||
*/
|
||||
if (connected)
|
||||
{
|
||||
neon_log(LOG, "dropping connection to page server due to error");
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
pageserver_send(ZenithRequest *request)
|
||||
{
|
||||
StringInfoData req_buff;
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
}
|
||||
|
||||
if (!connected)
|
||||
pageserver_connect();
|
||||
|
||||
req_buff = zm_pack_request(request);
|
||||
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output
|
||||
* and TCP buffer.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char* msg = PQerrorMessage(pageserver_conn);
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "failed to send page request: %s", msg);
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
|
||||
if (message_level_is_interesting(PageStoreTrace))
|
||||
{
|
||||
char *msg = zm_to_string((ZenithMessage *) request);
|
||||
neon_log(PageStoreTrace, "sent request: %s", msg);
|
||||
pfree(msg);
|
||||
}
|
||||
}
|
||||
|
||||
static ZenithResponse *
|
||||
pageserver_receive(void)
|
||||
{
|
||||
StringInfoData resp_buff;
|
||||
ZenithResponse *resp;
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
}
|
||||
|
||||
if (!connected)
|
||||
pageserver_connect();
|
||||
|
||||
req_buff = zm_pack_request(request);
|
||||
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output
|
||||
* and TCP buffer.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0 || PQflush(pageserver_conn))
|
||||
{
|
||||
neon_log(ERROR, "failed to send page request: %s",
|
||||
PQerrorMessage(pageserver_conn));
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
|
||||
if (message_level_is_interesting(PageStoreTrace))
|
||||
{
|
||||
char *msg = zm_to_string((ZenithMessage *) request);
|
||||
|
||||
neon_log(PageStoreTrace, "sent request: %s", msg);
|
||||
pfree(msg);
|
||||
}
|
||||
|
||||
/* read response */
|
||||
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
||||
resp_buff.cursor = 0;
|
||||
|
||||
if (resp_buff.len == -1)
|
||||
neon_log(ERROR, "end of COPY");
|
||||
else if (resp_buff.len == -2)
|
||||
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||
|
||||
if (resp_buff.len < 0)
|
||||
{
|
||||
if (resp_buff.len == -1)
|
||||
neon_log(ERROR, "end of COPY");
|
||||
else if (resp_buff.len == -2)
|
||||
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||
}
|
||||
resp = zm_unpack_response(&resp_buff);
|
||||
PQfreemem(resp_buff.data);
|
||||
|
||||
@@ -221,20 +242,7 @@ pageserver_call(ZenithRequest *request)
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
/*
|
||||
* If anything goes wrong while we were sending a request, it's not
|
||||
* clear what state the connection is in. For example, if we sent the
|
||||
* request but didn't receive a response yet, we might receive the
|
||||
* response some time later after we have already sent a new unrelated
|
||||
* request. Close the connection to avoid getting confused.
|
||||
*/
|
||||
if (connected)
|
||||
{
|
||||
neon_log(LOG, "dropping connection to page server due to error");
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
}
|
||||
pageserver_disconnect();
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
@@ -243,6 +251,32 @@ pageserver_call(ZenithRequest *request)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
pageserver_flush(void)
|
||||
{
|
||||
if (PQflush(pageserver_conn))
|
||||
{
|
||||
char* msg = PQerrorMessage(pageserver_conn);
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "failed to flush page requests: %s", msg);
|
||||
}
|
||||
}
|
||||
|
||||
static ZenithResponse *
|
||||
pageserver_call(ZenithRequest* request)
|
||||
{
|
||||
pageserver_send(request);
|
||||
pageserver_flush();
|
||||
return pageserver_receive();
|
||||
}
|
||||
|
||||
page_server_api api = {
|
||||
.request = pageserver_call,
|
||||
.send = pageserver_send,
|
||||
.flush = pageserver_flush,
|
||||
.receive = pageserver_receive
|
||||
};
|
||||
|
||||
static bool
|
||||
check_zenith_id(char **newval, void **extra, GucSource source)
|
||||
{
|
||||
|
||||
@@ -142,7 +142,10 @@ extern char *zm_to_string(ZenithMessage *msg);
|
||||
typedef struct
|
||||
{
|
||||
ZenithResponse *(*request) (ZenithRequest *request);
|
||||
} page_server_api;
|
||||
void (*send) (ZenithRequest *request);
|
||||
ZenithResponse *(*receive) (void);
|
||||
void (*flush) (void);
|
||||
} page_server_api;
|
||||
|
||||
extern page_server_api *page_server;
|
||||
|
||||
@@ -171,6 +174,7 @@ extern void zenith_extend(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||
extern bool zenith_prefetch(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum);
|
||||
extern void zenith_reset_prefetch(SMgrRelation reln);
|
||||
extern void zenith_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
char *buffer);
|
||||
|
||||
|
||||
@@ -57,6 +57,8 @@
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/relfilenode.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/md.h"
|
||||
#include "fmgr.h"
|
||||
#include "miscadmin.h"
|
||||
@@ -110,6 +112,49 @@ typedef enum
|
||||
static SMgrRelation unlogged_build_rel = NULL;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
|
||||
/*
|
||||
* Prefetch implementation:
|
||||
* Prefetch is performed locally by each backend.
|
||||
* There can be up to MAX_PREFETCH_REQUESTS registered using smgr_prefetch
|
||||
* before smgr_read. All this requests are appended to primary smgr_read request.
|
||||
* It is assumed that pages will be requested in prefetch order.
|
||||
* Reading of prefetch responses is delayed until them are actually needed (smgr_read).
|
||||
* It make it possible to parallelize processing and receiving of prefetched pages.
|
||||
* In case of prefetch miss or any other SMGR request other than smgr_read,
|
||||
* all prefetch responses has to be consumed.
|
||||
*/
|
||||
|
||||
#define MAX_PREFETCH_REQUESTS 128
|
||||
|
||||
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
|
||||
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
|
||||
int n_prefetch_requests;
|
||||
int n_prefetch_responses;
|
||||
int n_prefetched_buffers;
|
||||
int n_prefetch_hits;
|
||||
int n_prefetch_misses;
|
||||
XLogRecPtr prefetch_lsn;
|
||||
|
||||
static void
|
||||
consume_prefetch_responses(void)
|
||||
{
|
||||
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++) {
|
||||
ZenithResponse* resp = page_server->receive();
|
||||
pfree(resp);
|
||||
}
|
||||
n_prefetched_buffers = 0;
|
||||
n_prefetch_responses = 0;
|
||||
}
|
||||
|
||||
static ZenithResponse*
|
||||
page_server_request(void const* req)
|
||||
{
|
||||
consume_prefetch_responses();
|
||||
return page_server->request((ZenithRequest*)req);
|
||||
}
|
||||
|
||||
|
||||
StringInfoData
|
||||
zm_pack_request(ZenithRequest *msg)
|
||||
{
|
||||
@@ -735,7 +780,7 @@ zenith_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
.forknum = forkNum
|
||||
};
|
||||
|
||||
resp = page_server->request((ZenithRequest *) &request);
|
||||
resp = page_server_request(&request);
|
||||
}
|
||||
|
||||
switch (resp->tag)
|
||||
@@ -948,6 +993,16 @@ zenith_close(SMgrRelation reln, ForkNumber forknum)
|
||||
mdclose(reln, forknum);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* zenith_reset_prefetch() -- reoe all previously rgistered prefeth requests
|
||||
*/
|
||||
void
|
||||
zenith_reset_prefetch(SMgrRelation reln)
|
||||
{
|
||||
n_prefetch_requests = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* zenith_prefetch() -- Initiate asynchronous read of the specified block of a relation
|
||||
*/
|
||||
@@ -971,9 +1026,15 @@ zenith_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/* not implemented */
|
||||
elog(SmgrTrace, "[ZENITH_SMGR] prefetch noop");
|
||||
return true;
|
||||
if (n_prefetch_requests < MAX_PREFETCH_REQUESTS)
|
||||
{
|
||||
prefetch_requests[n_prefetch_requests].rnode = reln->smgr_rnode.node;
|
||||
prefetch_requests[n_prefetch_requests].forkNum = forknum;
|
||||
prefetch_requests[n_prefetch_requests].blockNum = blocknum;
|
||||
n_prefetch_requests += 1;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1022,7 +1083,47 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer)
|
||||
{
|
||||
ZenithResponse *resp;
|
||||
int i;
|
||||
|
||||
/*
|
||||
* Try to find prefetched page.
|
||||
* It is assumed that pages will be requested in the same order as them are prefetched,
|
||||
* but some other backend may load page in shared buffers, so some prefetch responses should
|
||||
* be skipped.
|
||||
*/
|
||||
for (i = n_prefetched_buffers; i < n_prefetch_responses; i++)
|
||||
{
|
||||
resp = page_server->receive();
|
||||
if (resp->tag == T_ZenithGetPageResponse &&
|
||||
RelFileNodeEquals(prefetch_responses[i].rnode, rnode) &&
|
||||
prefetch_responses[i].forkNum == forkNum &&
|
||||
prefetch_responses[i].blockNum == blkno)
|
||||
{
|
||||
char* page = ((ZenithGetPageResponse *) resp)->page;
|
||||
/*
|
||||
* Check if prefetched page is still relevant.
|
||||
* If it is updated by some other backend, then it should not
|
||||
* be requested from smgr unless it is evicted from shared buffers.
|
||||
* In the last case last_evicted_lsn should be updated and
|
||||
* request_lsn should be greater than prefetch_lsn.
|
||||
* Maximum with page LSN is used because page returned by page server
|
||||
* may have LSN either greater either smaller than requested.
|
||||
*/
|
||||
if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn)
|
||||
{
|
||||
n_prefetched_buffers = i+1;
|
||||
n_prefetch_hits += 1;
|
||||
n_prefetch_requests = 0;
|
||||
memcpy(buffer, page, BLCKSZ);
|
||||
pfree(resp);
|
||||
return;
|
||||
}
|
||||
}
|
||||
pfree(resp);
|
||||
}
|
||||
n_prefetched_buffers = 0;
|
||||
n_prefetch_responses = 0;
|
||||
n_prefetch_misses += 1;
|
||||
{
|
||||
ZenithGetPageRequest request = {
|
||||
.req.tag = T_ZenithGetPageRequest,
|
||||
@@ -1032,10 +1133,29 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno
|
||||
.forknum = forkNum,
|
||||
.blkno = blkno
|
||||
};
|
||||
|
||||
resp = page_server->request((ZenithRequest *) &request);
|
||||
if (n_prefetch_requests > 0)
|
||||
{
|
||||
/* Combine all prefetch requests with primary request */
|
||||
page_server->send((ZenithRequest *) &request);
|
||||
for (i = 0; i < n_prefetch_requests; i++)
|
||||
{
|
||||
request.rnode = prefetch_requests[i].rnode;
|
||||
request.forknum = prefetch_requests[i].forkNum;
|
||||
request.blkno = prefetch_requests[i].blockNum;
|
||||
prefetch_responses[i] = prefetch_requests[i];
|
||||
page_server->send((ZenithRequest *) &request);
|
||||
}
|
||||
page_server->flush();
|
||||
n_prefetch_responses = n_prefetch_requests;
|
||||
n_prefetch_requests = 0;
|
||||
prefetch_lsn = request_lsn;
|
||||
resp = page_server->receive();
|
||||
}
|
||||
else
|
||||
{
|
||||
resp = page_server->request((ZenithRequest *) &request);
|
||||
}
|
||||
}
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_ZenithGetPageResponse:
|
||||
@@ -1305,7 +1425,7 @@ zenith_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
.forknum = forknum,
|
||||
};
|
||||
|
||||
resp = page_server->request((ZenithRequest *) &request);
|
||||
resp = page_server_request(&request);
|
||||
}
|
||||
|
||||
switch (resp->tag)
|
||||
@@ -1365,7 +1485,7 @@ zenith_dbsize(Oid dbNode)
|
||||
.dbNode = dbNode,
|
||||
};
|
||||
|
||||
resp = page_server->request((ZenithRequest *) &request);
|
||||
resp = page_server_request(&request);
|
||||
}
|
||||
|
||||
switch (resp->tag)
|
||||
@@ -1680,6 +1800,7 @@ static const struct f_smgr zenith_smgr =
|
||||
.smgr_unlink = zenith_unlink,
|
||||
.smgr_extend = zenith_extend,
|
||||
.smgr_prefetch = zenith_prefetch,
|
||||
.smgr_reset_prefetch = zenith_reset_prefetch,
|
||||
.smgr_read = zenith_read,
|
||||
.smgr_write = zenith_write,
|
||||
.smgr_writeback = zenith_writeback,
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: e8518d3fc8...114676d2ed
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 313769bb62...b1dbd93e2b
Reference in New Issue
Block a user