diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 649fc1037e..d0572e66cb 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -43,11 +43,6 @@ PGconn *pageserver_conn = NULL; char *page_server_connstring_raw; -static ZenithResponse *pageserver_call(ZenithRequest *request); -page_server_api api = { - .request = pageserver_call -}; - static void pageserver_connect() { @@ -154,60 +149,86 @@ retry: } -static ZenithResponse * -pageserver_call(ZenithRequest *request) +static void +pageserver_disconnect(void) +{ + /* + * If anything goes wrong while we were sending a request, it's not + * clear what state the connection is in. For example, if we sent the + * request but didn't receive a response yet, we might receive the + * response some time later after we have already sent a new unrelated + * request. Close the connection to avoid getting confused. + */ + if (connected) + { + neon_log(LOG, "dropping connection to page server due to error"); + PQfinish(pageserver_conn); + pageserver_conn = NULL; + connected = false; + } +} + +static void +pageserver_send(ZenithRequest *request) { StringInfoData req_buff; + + /* If the connection was lost for some reason, reconnect */ + if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) + { + PQfinish(pageserver_conn); + pageserver_conn = NULL; + connected = false; + } + + if (!connected) + pageserver_connect(); + + req_buff = zm_pack_request(request); + + /* + * Send request. + * + * In principle, this could block if the output buffer is full, and we + * should use async mode and check for interrupts while waiting. In + * practice, our requests are small enough to always fit in the output + * and TCP buffer. + */ + if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) + { + char* msg = PQerrorMessage(pageserver_conn); + pageserver_disconnect(); + neon_log(ERROR, "failed to send page request: %s", msg); + } + pfree(req_buff.data); + + if (message_level_is_interesting(PageStoreTrace)) + { + char *msg = zm_to_string((ZenithMessage *) request); + neon_log(PageStoreTrace, "sent request: %s", msg); + pfree(msg); + } +} + +static ZenithResponse * +pageserver_receive(void) +{ StringInfoData resp_buff; ZenithResponse *resp; PG_TRY(); { - /* If the connection was lost for some reason, reconnect */ - if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) - { - PQfinish(pageserver_conn); - pageserver_conn = NULL; - connected = false; - } - - if (!connected) - pageserver_connect(); - - req_buff = zm_pack_request(request); - - /* - * Send request. - * - * In principle, this could block if the output buffer is full, and we - * should use async mode and check for interrupts while waiting. In - * practice, our requests are small enough to always fit in the output - * and TCP buffer. - */ - if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0 || PQflush(pageserver_conn)) - { - neon_log(ERROR, "failed to send page request: %s", - PQerrorMessage(pageserver_conn)); - } - pfree(req_buff.data); - - if (message_level_is_interesting(PageStoreTrace)) - { - char *msg = zm_to_string((ZenithMessage *) request); - - neon_log(PageStoreTrace, "sent request: %s", msg); - pfree(msg); - } - /* read response */ resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data); resp_buff.cursor = 0; - if (resp_buff.len == -1) - neon_log(ERROR, "end of COPY"); - else if (resp_buff.len == -2) - neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn)); - + if (resp_buff.len < 0) + { + if (resp_buff.len == -1) + neon_log(ERROR, "end of COPY"); + else if (resp_buff.len == -2) + neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn)); + } resp = zm_unpack_response(&resp_buff); PQfreemem(resp_buff.data); @@ -221,20 +242,7 @@ pageserver_call(ZenithRequest *request) } PG_CATCH(); { - /* - * If anything goes wrong while we were sending a request, it's not - * clear what state the connection is in. For example, if we sent the - * request but didn't receive a response yet, we might receive the - * response some time later after we have already sent a new unrelated - * request. Close the connection to avoid getting confused. - */ - if (connected) - { - neon_log(LOG, "dropping connection to page server due to error"); - PQfinish(pageserver_conn); - pageserver_conn = NULL; - connected = false; - } + pageserver_disconnect(); PG_RE_THROW(); } PG_END_TRY(); @@ -243,6 +251,32 @@ pageserver_call(ZenithRequest *request) } +static void +pageserver_flush(void) +{ + if (PQflush(pageserver_conn)) + { + char* msg = PQerrorMessage(pageserver_conn); + pageserver_disconnect(); + neon_log(ERROR, "failed to flush page requests: %s", msg); + } +} + +static ZenithResponse * +pageserver_call(ZenithRequest* request) +{ + pageserver_send(request); + pageserver_flush(); + return pageserver_receive(); +} + +page_server_api api = { + .request = pageserver_call, + .send = pageserver_send, + .flush = pageserver_flush, + .receive = pageserver_receive +}; + static bool check_zenith_id(char **newval, void **extra, GucSource source) { diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 93ea6771eb..5b21abc1bd 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -142,7 +142,10 @@ extern char *zm_to_string(ZenithMessage *msg); typedef struct { ZenithResponse *(*request) (ZenithRequest *request); -} page_server_api; + void (*send) (ZenithRequest *request); + ZenithResponse *(*receive) (void); + void (*flush) (void); +} page_server_api; extern page_server_api *page_server; @@ -171,6 +174,7 @@ extern void zenith_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); extern bool zenith_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum); +extern void zenith_reset_prefetch(SMgrRelation reln); extern void zenith_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index d49df7af58..ebf899dfdb 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -57,6 +57,8 @@ #include "postmaster/interrupt.h" #include "replication/walsender.h" #include "storage/bufmgr.h" +#include "storage/relfilenode.h" +#include "storage/buf_internals.h" #include "storage/md.h" #include "fmgr.h" #include "miscadmin.h" @@ -110,6 +112,49 @@ typedef enum static SMgrRelation unlogged_build_rel = NULL; static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS; + +/* + * Prefetch implementation: + * Prefetch is performed locally by each backend. + * There can be up to MAX_PREFETCH_REQUESTS registered using smgr_prefetch + * before smgr_read. All this requests are appended to primary smgr_read request. + * It is assumed that pages will be requested in prefetch order. + * Reading of prefetch responses is delayed until them are actually needed (smgr_read). + * It make it possible to parallelize processing and receiving of prefetched pages. + * In case of prefetch miss or any other SMGR request other than smgr_read, + * all prefetch responses has to be consumed. + */ + +#define MAX_PREFETCH_REQUESTS 128 + +BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS]; +BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS]; +int n_prefetch_requests; +int n_prefetch_responses; +int n_prefetched_buffers; +int n_prefetch_hits; +int n_prefetch_misses; +XLogRecPtr prefetch_lsn; + +static void +consume_prefetch_responses(void) +{ + for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++) { + ZenithResponse* resp = page_server->receive(); + pfree(resp); + } + n_prefetched_buffers = 0; + n_prefetch_responses = 0; +} + +static ZenithResponse* +page_server_request(void const* req) +{ + consume_prefetch_responses(); + return page_server->request((ZenithRequest*)req); +} + + StringInfoData zm_pack_request(ZenithRequest *msg) { @@ -735,7 +780,7 @@ zenith_exists(SMgrRelation reln, ForkNumber forkNum) .forknum = forkNum }; - resp = page_server->request((ZenithRequest *) &request); + resp = page_server_request(&request); } switch (resp->tag) @@ -948,6 +993,16 @@ zenith_close(SMgrRelation reln, ForkNumber forknum) mdclose(reln, forknum); } + +/* + * zenith_reset_prefetch() -- reoe all previously rgistered prefeth requests + */ +void +zenith_reset_prefetch(SMgrRelation reln) +{ + n_prefetch_requests = 0; +} + /* * zenith_prefetch() -- Initiate asynchronous read of the specified block of a relation */ @@ -971,9 +1026,15 @@ zenith_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - /* not implemented */ - elog(SmgrTrace, "[ZENITH_SMGR] prefetch noop"); - return true; + if (n_prefetch_requests < MAX_PREFETCH_REQUESTS) + { + prefetch_requests[n_prefetch_requests].rnode = reln->smgr_rnode.node; + prefetch_requests[n_prefetch_requests].forkNum = forknum; + prefetch_requests[n_prefetch_requests].blockNum = blocknum; + n_prefetch_requests += 1; + return true; + } + return false; } /* @@ -1022,7 +1083,47 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno XLogRecPtr request_lsn, bool request_latest, char *buffer) { ZenithResponse *resp; + int i; + /* + * Try to find prefetched page. + * It is assumed that pages will be requested in the same order as them are prefetched, + * but some other backend may load page in shared buffers, so some prefetch responses should + * be skipped. + */ + for (i = n_prefetched_buffers; i < n_prefetch_responses; i++) + { + resp = page_server->receive(); + if (resp->tag == T_ZenithGetPageResponse && + RelFileNodeEquals(prefetch_responses[i].rnode, rnode) && + prefetch_responses[i].forkNum == forkNum && + prefetch_responses[i].blockNum == blkno) + { + char* page = ((ZenithGetPageResponse *) resp)->page; + /* + * Check if prefetched page is still relevant. + * If it is updated by some other backend, then it should not + * be requested from smgr unless it is evicted from shared buffers. + * In the last case last_evicted_lsn should be updated and + * request_lsn should be greater than prefetch_lsn. + * Maximum with page LSN is used because page returned by page server + * may have LSN either greater either smaller than requested. + */ + if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn) + { + n_prefetched_buffers = i+1; + n_prefetch_hits += 1; + n_prefetch_requests = 0; + memcpy(buffer, page, BLCKSZ); + pfree(resp); + return; + } + } + pfree(resp); + } + n_prefetched_buffers = 0; + n_prefetch_responses = 0; + n_prefetch_misses += 1; { ZenithGetPageRequest request = { .req.tag = T_ZenithGetPageRequest, @@ -1032,10 +1133,29 @@ void zenith_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno .forknum = forkNum, .blkno = blkno }; - - resp = page_server->request((ZenithRequest *) &request); + if (n_prefetch_requests > 0) + { + /* Combine all prefetch requests with primary request */ + page_server->send((ZenithRequest *) &request); + for (i = 0; i < n_prefetch_requests; i++) + { + request.rnode = prefetch_requests[i].rnode; + request.forknum = prefetch_requests[i].forkNum; + request.blkno = prefetch_requests[i].blockNum; + prefetch_responses[i] = prefetch_requests[i]; + page_server->send((ZenithRequest *) &request); + } + page_server->flush(); + n_prefetch_responses = n_prefetch_requests; + n_prefetch_requests = 0; + prefetch_lsn = request_lsn; + resp = page_server->receive(); + } + else + { + resp = page_server->request((ZenithRequest *) &request); + } } - switch (resp->tag) { case T_ZenithGetPageResponse: @@ -1305,7 +1425,7 @@ zenith_nblocks(SMgrRelation reln, ForkNumber forknum) .forknum = forknum, }; - resp = page_server->request((ZenithRequest *) &request); + resp = page_server_request(&request); } switch (resp->tag) @@ -1365,7 +1485,7 @@ zenith_dbsize(Oid dbNode) .dbNode = dbNode, }; - resp = page_server->request((ZenithRequest *) &request); + resp = page_server_request(&request); } switch (resp->tag) @@ -1680,6 +1800,7 @@ static const struct f_smgr zenith_smgr = .smgr_unlink = zenith_unlink, .smgr_extend = zenith_extend, .smgr_prefetch = zenith_prefetch, + .smgr_reset_prefetch = zenith_reset_prefetch, .smgr_read = zenith_read, .smgr_write = zenith_write, .smgr_writeback = zenith_writeback, diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index e8518d3fc8..114676d2ed 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit e8518d3fc85e3da420d2f5a2742a21386e6585ec +Subproject commit 114676d2edd5307226d9448ec467821fdb77467d diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 313769bb62..b1dbd93e2b 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 313769bb6229f46380e24d8f6ff535f9185458af +Subproject commit b1dbd93e2b1691e93860f7e59b9e1fe5a6e79786