mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 22:29:58 +00:00
Changes of neon extension to support local prefetch (#2369)
* Changes of neon extension to support local prefetch * Catch exceptions in pageserver_receive * Bump posgres version * Bump posgres version * Bump posgres version * Bump posgres version
This commit is contained in:
committed by
GitHub
parent
4f7557fb58
commit
f44afbaf62
@@ -43,11 +43,6 @@ PGconn *pageserver_conn = NULL;
|
|||||||
|
|
||||||
char *page_server_connstring_raw;
|
char *page_server_connstring_raw;
|
||||||
|
|
||||||
static ZenithResponse *pageserver_call(ZenithRequest *request);
|
|
||||||
page_server_api api = {
|
|
||||||
.request = pageserver_call
|
|
||||||
};
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
pageserver_connect()
|
pageserver_connect()
|
||||||
{
|
{
|
||||||
@@ -154,60 +149,86 @@ retry:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static ZenithResponse *
|
static void
|
||||||
pageserver_call(ZenithRequest *request)
|
pageserver_disconnect(void)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If anything goes wrong while we were sending a request, it's not
|
||||||
|
* clear what state the connection is in. For example, if we sent the
|
||||||
|
* request but didn't receive a response yet, we might receive the
|
||||||
|
* response some time later after we have already sent a new unrelated
|
||||||
|
* request. Close the connection to avoid getting confused.
|
||||||
|
*/
|
||||||
|
if (connected)
|
||||||
|
{
|
||||||
|
neon_log(LOG, "dropping connection to page server due to error");
|
||||||
|
PQfinish(pageserver_conn);
|
||||||
|
pageserver_conn = NULL;
|
||||||
|
connected = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
pageserver_send(ZenithRequest *request)
|
||||||
{
|
{
|
||||||
StringInfoData req_buff;
|
StringInfoData req_buff;
|
||||||
|
|
||||||
|
/* If the connection was lost for some reason, reconnect */
|
||||||
|
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||||
|
{
|
||||||
|
PQfinish(pageserver_conn);
|
||||||
|
pageserver_conn = NULL;
|
||||||
|
connected = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!connected)
|
||||||
|
pageserver_connect();
|
||||||
|
|
||||||
|
req_buff = zm_pack_request(request);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Send request.
|
||||||
|
*
|
||||||
|
* In principle, this could block if the output buffer is full, and we
|
||||||
|
* should use async mode and check for interrupts while waiting. In
|
||||||
|
* practice, our requests are small enough to always fit in the output
|
||||||
|
* and TCP buffer.
|
||||||
|
*/
|
||||||
|
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||||
|
{
|
||||||
|
char* msg = PQerrorMessage(pageserver_conn);
|
||||||
|
pageserver_disconnect();
|
||||||
|
neon_log(ERROR, "failed to send page request: %s", msg);
|
||||||
|
}
|
||||||
|
pfree(req_buff.data);
|
||||||
|
|
||||||
|
if (message_level_is_interesting(PageStoreTrace))
|
||||||
|
{
|
||||||
|
char *msg = zm_to_string((ZenithMessage *) request);
|
||||||
|
neon_log(PageStoreTrace, "sent request: %s", msg);
|
||||||
|
pfree(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static ZenithResponse *
|
||||||
|
pageserver_receive(void)
|
||||||
|
{
|
||||||
StringInfoData resp_buff;
|
StringInfoData resp_buff;
|
||||||
ZenithResponse *resp;
|
ZenithResponse *resp;
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
/* If the connection was lost for some reason, reconnect */
|
|
||||||
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
|
||||||
{
|
|
||||||
PQfinish(pageserver_conn);
|
|
||||||
pageserver_conn = NULL;
|
|
||||||
connected = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!connected)
|
|
||||||
pageserver_connect();
|
|
||||||
|
|
||||||
req_buff = zm_pack_request(request);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Send request.
|
|
||||||
*
|
|
||||||
* In principle, this could block if the output buffer is full, and we
|
|
||||||
* should use async mode and check for interrupts while waiting. In
|
|
||||||
* practice, our requests are small enough to always fit in the output
|
|
||||||
* and TCP buffer.
|
|
||||||
*/
|
|
||||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0 || PQflush(pageserver_conn))
|
|
||||||
{
|
|
||||||
neon_log(ERROR, "failed to send page request: %s",
|
|
||||||
PQerrorMessage(pageserver_conn));
|
|
||||||
}
|
|
||||||
pfree(req_buff.data);
|
|
||||||
|
|
||||||
if (message_level_is_interesting(PageStoreTrace))
|
|
||||||
{
|
|
||||||
char *msg = zm_to_string((ZenithMessage *) request);
|
|
||||||
|
|
||||||
neon_log(PageStoreTrace, "sent request: %s", msg);
|
|
||||||
pfree(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* read response */
|
/* read response */
|
||||||
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
||||||
resp_buff.cursor = 0;
|
resp_buff.cursor = 0;
|
||||||
|
|
||||||
if (resp_buff.len == -1)
|
if (resp_buff.len < 0)
|
||||||
neon_log(ERROR, "end of COPY");
|
{
|
||||||
else if (resp_buff.len == -2)
|
if (resp_buff.len == -1)
|
||||||
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
neon_log(ERROR, "end of COPY");
|
||||||
|
else if (resp_buff.len == -2)
|
||||||
|
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||||
|
}
|
||||||
resp = zm_unpack_response(&resp_buff);
|
resp = zm_unpack_response(&resp_buff);
|
||||||
PQfreemem(resp_buff.data);
|
PQfreemem(resp_buff.data);
|
||||||
|
|
||||||
@@ -221,20 +242,7 @@ pageserver_call(ZenithRequest *request)
|
|||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
/*
|
pageserver_disconnect();
|
||||||
* If anything goes wrong while we were sending a request, it's not
|
|
||||||
* clear what state the connection is in. For example, if we sent the
|
|
||||||
* request but didn't receive a response yet, we might receive the
|
|
||||||
* response some time later after we have already sent a new unrelated
|
|
||||||
* request. Close the connection to avoid getting confused.
|
|
||||||
*/
|
|
||||||
if (connected)
|
|
||||||
{
|
|
||||||
neon_log(LOG, "dropping connection to page server due to error");
|
|
||||||
PQfinish(pageserver_conn);
|
|
||||||
pageserver_conn = NULL;
|
|
||||||
connected = false;
|
|
||||||
}
|
|
||||||
PG_RE_THROW();
|
PG_RE_THROW();
|
||||||
}
|
}
|
||||||
PG_END_TRY();
|
PG_END_TRY();
|
||||||
@@ -243,6 +251,32 @@ pageserver_call(ZenithRequest *request)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
pageserver_flush(void)
|
||||||
|
{
|
||||||
|
if (PQflush(pageserver_conn))
|
||||||
|
{
|
||||||
|
char* msg = PQerrorMessage(pageserver_conn);
|
||||||
|
pageserver_disconnect();
|
||||||
|
neon_log(ERROR, "failed to flush page requests: %s", msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static ZenithResponse *
|
||||||
|
pageserver_call(ZenithRequest* request)
|
||||||
|
{
|
||||||
|
pageserver_send(request);
|
||||||
|
pageserver_flush();
|
||||||
|
return pageserver_receive();
|
||||||
|
}
|
||||||
|
|
||||||
|
page_server_api api = {
|
||||||
|
.request = pageserver_call,
|
||||||
|
.send = pageserver_send,
|
||||||
|
.flush = pageserver_flush,
|
||||||
|
.receive = pageserver_receive
|
||||||
|
};
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
check_zenith_id(char **newval, void **extra, GucSource source)
|
check_zenith_id(char **newval, void **extra, GucSource source)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -142,7 +142,10 @@ extern char *zm_to_string(ZenithMessage *msg);
|
|||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
ZenithResponse *(*request) (ZenithRequest *request);
|
ZenithResponse *(*request) (ZenithRequest *request);
|
||||||
} page_server_api;
|
void (*send) (ZenithRequest *request);
|
||||||
|
ZenithResponse *(*receive) (void);
|
||||||
|
void (*flush) (void);
|
||||||
|
} page_server_api;
|
||||||
|
|
||||||
extern page_server_api *page_server;
|
extern page_server_api *page_server;
|
||||||
|
|
||||||
@@ -171,6 +174,7 @@ extern void zenith_extend(SMgrRelation reln, ForkNumber forknum,
|
|||||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||||
extern bool zenith_prefetch(SMgrRelation reln, ForkNumber forknum,
|
extern bool zenith_prefetch(SMgrRelation reln, ForkNumber forknum,
|
||||||
BlockNumber blocknum);
|
BlockNumber blocknum);
|
||||||
|
extern void zenith_reset_prefetch(SMgrRelation reln);
|
||||||
extern void zenith_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
extern void zenith_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||||
char *buffer);
|
char *buffer);
|
||||||
|
|
||||||
|
|||||||
@@ -57,6 +57,8 @@
|
|||||||
#include "postmaster/interrupt.h"
|
#include "postmaster/interrupt.h"
|
||||||
#include "replication/walsender.h"
|
#include "replication/walsender.h"
|
||||||
#include "storage/bufmgr.h"
|
#include "storage/bufmgr.h"
|
||||||
|
#include "storage/relfilenode.h"
|
||||||
|
#include "storage/buf_internals.h"
|
||||||
#include "storage/md.h"
|
#include "storage/md.h"
|
||||||
#include "fmgr.h"
|
#include "fmgr.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
@@ -110,6 +112,49 @@ typedef enum
|
|||||||
static SMgrRelation unlogged_build_rel = NULL;
|
static SMgrRelation unlogged_build_rel = NULL;
|
||||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Prefetch implementation:
|
||||||
|
* Prefetch is performed locally by each backend.
|
||||||
|
* There can be up to MAX_PREFETCH_REQUESTS registered using smgr_prefetch
|
||||||
|
* before smgr_read. All this requests are appended to primary smgr_read request.
|
||||||
|
* It is assumed that pages will be requested in prefetch order.
|
||||||
|
* Reading of prefetch responses is delayed until them are actually needed (smgr_read).
|
||||||
|
* It make it possible to parallelize processing and receiving of prefetched pages.
|
||||||
|
* In case of prefetch miss or any other SMGR request other than smgr_read,
|
||||||
|
* all prefetch responses has to be consumed.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define MAX_PREFETCH_REQUESTS 128
|
||||||
|
|
||||||
|
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
|
||||||
|
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
|
||||||
|
int n_prefetch_requests;
|
||||||
|
int n_prefetch_responses;
|
||||||
|
int n_prefetched_buffers;
|
||||||
|
int n_prefetch_hits;
|
||||||
|
int n_prefetch_misses;
|
||||||
|
XLogRecPtr prefetch_lsn;
|
||||||
|
|
||||||
|
static void
|
||||||
|
consume_prefetch_responses(void)
|
||||||
|
{
|
||||||
|
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++) {
|
||||||
|
ZenithResponse* resp = page_server->receive();
|
||||||
|
pfree(resp);
|
||||||
|
}
|
||||||
|
n_prefetched_buffers = 0;
|
||||||
|
n_prefetch_responses = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ZenithResponse*
|
||||||
|
page_server_request(void const* req)
|
||||||
|
{
|
||||||
|
consume_prefetch_responses();
|
||||||
|
return page_server->request((ZenithRequest*)req);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
StringInfoData
|
StringInfoData
|
||||||
zm_pack_request(ZenithRequest *msg)
|
zm_pack_request(ZenithRequest *msg)
|
||||||
{
|
{
|
||||||
@@ -735,7 +780,7 @@ zenith_exists(SMgrRelation reln, ForkNumber forkNum)
|
|||||||
.forknum = forkNum
|
.forknum = forkNum
|
||||||
};
|
};
|
||||||
|
|
||||||
resp = page_server->request((ZenithRequest *) &request);
|
resp = page_server_request(&request);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (resp->tag)
|
switch (resp->tag)
|
||||||
@@ -948,6 +993,16 @@ zenith_close(SMgrRelation reln, ForkNumber forknum)
|
|||||||
mdclose(reln, forknum);
|
mdclose(reln, forknum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* zenith_reset_prefetch() -- reoe all previously rgistered prefeth requests
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
zenith_reset_prefetch(SMgrRelation reln)
|
||||||
|
{
|
||||||
|
n_prefetch_requests = 0;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* zenith_prefetch() -- Initiate asynchronous read of the specified block of a relation
|
* zenith_prefetch() -- Initiate asynchronous read of the specified block of a relation
|
||||||
*/
|
*/
|
||||||
@@ -971,9 +1026,15 @@ zenith_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
|||||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* not implemented */
|
if (n_prefetch_requests < MAX_PREFETCH_REQUESTS)
|
||||||
elog(SmgrTrace, "[ZENITH_SMGR] prefetch noop");
|
{
|
||||||
return true;
|
prefetch_requests[n_prefetch_requests].rnode = reln->smgr_rnode.node;
|
||||||
|
prefetch_requests[n_prefetch_requests].forkNum = forknum;
|
||||||
|
prefetch_requests[n_prefetch_requests].blockNum = blocknum;
|
||||||
|
n_prefetch_requests += 1;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -1022,7 +1083,47 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno
|
|||||||
XLogRecPtr request_lsn, bool request_latest, char *buffer)
|
XLogRecPtr request_lsn, bool request_latest, char *buffer)
|
||||||
{
|
{
|
||||||
ZenithResponse *resp;
|
ZenithResponse *resp;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Try to find prefetched page.
|
||||||
|
* It is assumed that pages will be requested in the same order as them are prefetched,
|
||||||
|
* but some other backend may load page in shared buffers, so some prefetch responses should
|
||||||
|
* be skipped.
|
||||||
|
*/
|
||||||
|
for (i = n_prefetched_buffers; i < n_prefetch_responses; i++)
|
||||||
|
{
|
||||||
|
resp = page_server->receive();
|
||||||
|
if (resp->tag == T_ZenithGetPageResponse &&
|
||||||
|
RelFileNodeEquals(prefetch_responses[i].rnode, rnode) &&
|
||||||
|
prefetch_responses[i].forkNum == forkNum &&
|
||||||
|
prefetch_responses[i].blockNum == blkno)
|
||||||
|
{
|
||||||
|
char* page = ((ZenithGetPageResponse *) resp)->page;
|
||||||
|
/*
|
||||||
|
* Check if prefetched page is still relevant.
|
||||||
|
* If it is updated by some other backend, then it should not
|
||||||
|
* be requested from smgr unless it is evicted from shared buffers.
|
||||||
|
* In the last case last_evicted_lsn should be updated and
|
||||||
|
* request_lsn should be greater than prefetch_lsn.
|
||||||
|
* Maximum with page LSN is used because page returned by page server
|
||||||
|
* may have LSN either greater either smaller than requested.
|
||||||
|
*/
|
||||||
|
if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn)
|
||||||
|
{
|
||||||
|
n_prefetched_buffers = i+1;
|
||||||
|
n_prefetch_hits += 1;
|
||||||
|
n_prefetch_requests = 0;
|
||||||
|
memcpy(buffer, page, BLCKSZ);
|
||||||
|
pfree(resp);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pfree(resp);
|
||||||
|
}
|
||||||
|
n_prefetched_buffers = 0;
|
||||||
|
n_prefetch_responses = 0;
|
||||||
|
n_prefetch_misses += 1;
|
||||||
{
|
{
|
||||||
ZenithGetPageRequest request = {
|
ZenithGetPageRequest request = {
|
||||||
.req.tag = T_ZenithGetPageRequest,
|
.req.tag = T_ZenithGetPageRequest,
|
||||||
@@ -1032,10 +1133,29 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno
|
|||||||
.forknum = forkNum,
|
.forknum = forkNum,
|
||||||
.blkno = blkno
|
.blkno = blkno
|
||||||
};
|
};
|
||||||
|
if (n_prefetch_requests > 0)
|
||||||
resp = page_server->request((ZenithRequest *) &request);
|
{
|
||||||
|
/* Combine all prefetch requests with primary request */
|
||||||
|
page_server->send((ZenithRequest *) &request);
|
||||||
|
for (i = 0; i < n_prefetch_requests; i++)
|
||||||
|
{
|
||||||
|
request.rnode = prefetch_requests[i].rnode;
|
||||||
|
request.forknum = prefetch_requests[i].forkNum;
|
||||||
|
request.blkno = prefetch_requests[i].blockNum;
|
||||||
|
prefetch_responses[i] = prefetch_requests[i];
|
||||||
|
page_server->send((ZenithRequest *) &request);
|
||||||
|
}
|
||||||
|
page_server->flush();
|
||||||
|
n_prefetch_responses = n_prefetch_requests;
|
||||||
|
n_prefetch_requests = 0;
|
||||||
|
prefetch_lsn = request_lsn;
|
||||||
|
resp = page_server->receive();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
resp = page_server->request((ZenithRequest *) &request);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (resp->tag)
|
switch (resp->tag)
|
||||||
{
|
{
|
||||||
case T_ZenithGetPageResponse:
|
case T_ZenithGetPageResponse:
|
||||||
@@ -1305,7 +1425,7 @@ zenith_nblocks(SMgrRelation reln, ForkNumber forknum)
|
|||||||
.forknum = forknum,
|
.forknum = forknum,
|
||||||
};
|
};
|
||||||
|
|
||||||
resp = page_server->request((ZenithRequest *) &request);
|
resp = page_server_request(&request);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (resp->tag)
|
switch (resp->tag)
|
||||||
@@ -1365,7 +1485,7 @@ zenith_dbsize(Oid dbNode)
|
|||||||
.dbNode = dbNode,
|
.dbNode = dbNode,
|
||||||
};
|
};
|
||||||
|
|
||||||
resp = page_server->request((ZenithRequest *) &request);
|
resp = page_server_request(&request);
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (resp->tag)
|
switch (resp->tag)
|
||||||
@@ -1680,6 +1800,7 @@ static const struct f_smgr zenith_smgr =
|
|||||||
.smgr_unlink = zenith_unlink,
|
.smgr_unlink = zenith_unlink,
|
||||||
.smgr_extend = zenith_extend,
|
.smgr_extend = zenith_extend,
|
||||||
.smgr_prefetch = zenith_prefetch,
|
.smgr_prefetch = zenith_prefetch,
|
||||||
|
.smgr_reset_prefetch = zenith_reset_prefetch,
|
||||||
.smgr_read = zenith_read,
|
.smgr_read = zenith_read,
|
||||||
.smgr_write = zenith_write,
|
.smgr_write = zenith_write,
|
||||||
.smgr_writeback = zenith_writeback,
|
.smgr_writeback = zenith_writeback,
|
||||||
|
|||||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: e8518d3fc8...114676d2ed
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 313769bb62...b1dbd93e2b
Reference in New Issue
Block a user