diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 8107ba84b9..97bf207e74 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -269,6 +269,12 @@ lfc_ensure_opened(void) return true; } +bool +lfc_enabled(void) +{ + return !lfc_maybe_disabled(); +} + static void lfc_shmem_startup(void) { diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 1a2694b465..9e1fc7f116 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -16,6 +16,7 @@ #include "postgres.h" +#include "access/twophase.h" #include "access/xlog.h" #include "common/hashfn.h" #include "fmgr.h" @@ -48,11 +49,13 @@ #define PageStoreTrace DEBUG5 -#define MIN_RECONNECT_INTERVAL_USEC 1000 -#define MAX_RECONNECT_INTERVAL_USEC 1000000 -#define RECEIVER_RETRY_DELAY_USEC 1000000 -#define MAX_REQUEST_SIZE 1024 -#define MAX_PS_QUERY_LENGTH 256 +#define MIN_RECONNECT_INTERVAL_USEC 1000 +#define MAX_RECONNECT_INTERVAL_USEC 1000000 +#define RECEIVER_RETRY_DELAY_USEC 1000000 +#define MAX_REQUEST_SIZE 1024 +#define MAX_PS_QUERY_LENGTH 256 +#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts) + /* GUCs */ char *neon_timeline; @@ -62,7 +65,7 @@ char *page_server_connstring; char *neon_auth_token; int max_prefetch_distance = 128; -int parallel_connections = 10; +int parallel_connections = 1; int neon_protocol_version = 3; @@ -72,7 +75,7 @@ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define CHAN_TO_SHARD(chan_no) ((chan_no) / parallel_connections) -void CommunicatorMain(Datum main_arg); +PGDLLEXPORT void CommunicatorMain(Datum main_arg); /* Produce error message in critical section for thgread safety */ #define neon_shard_log_cs(shard_no, tag, fmt, ...) do { \ @@ -107,8 +110,6 @@ int MyProcNumber; #define PG_IOV_MAX 32 #endif -static bool am_communicator = false; - typedef enum PSConnectionState { PS_Disconnected, /* no connection yet */ PS_Connecting_Startup, /* connection starting up */ @@ -235,12 +236,18 @@ ParseShardMap(const char *connstr, ShardMap *result) return true; } +static bool +IsCommunicatorProcess(void) +{ + return MyBgworkerEntry && strcmp(MyBgworkerEntry->bgw_function_name, "CommunicatorMain") == 0; +} + static bool CheckPageserverConnstring(char **newval, void **extra, GucSource source) { char *p = *newval; - return ParseShardMap(p, NULL); + return !IsCommunicatorProcess() || ParseShardMap(p, NULL); } static void @@ -252,7 +259,7 @@ AssignPageserverConnstring(const char *newval, void *extra) /* * Only communicator background worker estblish connections with page server and need this information */ - if (!am_communicator) + if (!IsCommunicatorProcess()) return; old_num_shards = shard_map.num_shards; @@ -272,7 +279,7 @@ AssignPageserverConnstring(const char *newval, void *extra) } /* Force to reestablish connection with old shards */ - for (size_t i = 0; i < old_num_shards; i++) + for (size_t i = 0; i < old_num_shards * parallel_connections; i++) { if (page_servers[i].state == PS_Connected) { @@ -282,7 +289,7 @@ AssignPageserverConnstring(const char *newval, void *extra) } /* Start workers for new shards */ - for (size_t i = old_num_shards; i < shard_map.num_shards; i++) + for (size_t i = old_num_shards * parallel_connections; i < shard_map.num_shards * parallel_connections; i++) { pthread_t reader, writer; void* chan_no = (void*)i; @@ -909,20 +916,15 @@ pageserver_receive(int chan_no) /* call_PQgetCopyData handles rc == 0 */ Assert(rc > 0); - /* FIXME: is it thread safe? */ - PG_TRY(); - { - resp_buff.len = rc; - resp_buff.cursor = 0; - resp = nm_unpack_response(&resp_buff); - PQfreemem(resp_buff.data); - } - PG_CATCH(); + resp_buff.len = rc; + resp_buff.cursor = 0; + resp = nm_unpack_response(&resp_buff); + PQfreemem(resp_buff.data); + if (resp == NULL) { neon_shard_log_cs(shard_no, LOG, "pageserver_receive: disconnect due to failure while parsing response"); pageserver_disconnect(chan_no); } - PG_END_TRY(); } else if (rc == -1) { @@ -955,16 +957,15 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } - /* * Each backend can send up to max_prefetch_distance prefetch requests and one vectored read request. - * Backends are splitted between parallel conntions so each worker has tpo server at most MaxBackends / parallel_connections + * Backends are splitted between parallel conntions so each worker has tpo server at most PROCARRAY_MAXPROCS / parallel_connections * backends. */ static Size RequestBufferSize(void) { - return (max_prefetch_distance + PG_IOV_MAX) * (MaxBackends + (parallel_connections - 1) / parallel_connections); + return (max_prefetch_distance + PG_IOV_MAX) * (PROCARRAY_MAXPROCS + (parallel_connections - 1) / parallel_connections); } static Size @@ -972,7 +973,7 @@ CommunicatorShmemSize(void) { return RequestBufferSize() * MaxNumberOfChannels() * sizeof(NeonCommunicatorRequest) + MaxNumberOfChannels() * sizeof(NeonCommunicatorChannel) - + sizeof(NeonCommunicatorResponse) * MaxBackends; + + sizeof(NeonCommunicatorResponse) * PROCARRAY_MAXPROCS; } static Size @@ -1062,7 +1063,7 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req) /* bind backend to the particular channel */ NeonCommunicatorChannel* chan = &channels[shard * parallel_connections + (MyProcNumber % parallel_connections)]; size_t ring_size = RequestBufferSize(); - uint64 write_pos = pg_atomic_add_fetch_u64(&chan->write_pos, 1); /* reserve write position */ + uint64 write_pos = pg_atomic_fetch_add_u64(&chan->write_pos, 1); /* reserve write position */ uint64 read_pos; Assert(req->hdr.u.reqid == 0); /* ring overflow should not happen */ @@ -1099,6 +1100,7 @@ communicator_receive_response(void) WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1L, WAIT_EVENT_NEON_PS_READ); + ResetLatch(MyLatch); } if (responses[MyProcNumber].tag == T_NeonErrorResponse) { @@ -1121,8 +1123,6 @@ communicator_request(int shard, NeonCommunicatorRequest* req) void CommunicatorMain(Datum main_arg) { - am_communicator = true; - /* Establish signal handlers. */ pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -1135,6 +1135,7 @@ CommunicatorMain(Datum main_arg) WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1L, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } } @@ -1210,11 +1211,12 @@ pg_init_libpagestore(void) GUC_UNIT_MB, NULL, NULL, NULL); + /* FIXME: enforce that effective_io_concurrency and maintenance_io_concurrency can not be set larger than max_prefetch_distance */ DefineCustomIntVariable("neon.max_prefetch_distance", "Maximal number of prefetch requests", "effetive_io_concurrency and maintenance_io_concurrecy should not be larger than sthis value", &max_prefetch_distance, - 128, 16, 1024, + 128, 0, 1024, PGC_POSTMASTER, 0, /* no flags required */ NULL, NULL, NULL); @@ -1223,7 +1225,7 @@ pg_init_libpagestore(void) "number of connections to each shard", NULL, ¶llel_connections, - 10, 1, 16, + 1, 1, 16, PGC_POSTMASTER, 0, /* no flags required */ NULL, NULL, NULL); @@ -1279,10 +1281,11 @@ allocStringInfo(StringInfo s, size_t size) static void* communicator_write_loop(void* arg) { - uint64 read_start_pos = 0; - size_t chan_no = (size_t)arg; + uint64 read_start_pos = 0; + size_t chan_no = (size_t)arg; + int shard_no = CHAN_TO_SHARD(chan_no); NeonCommunicatorChannel* chan = &channels[chan_no]; - size_t ring_size = RequestBufferSize(); + size_t ring_size = RequestBufferSize(); StringInfoData s; allocStringInfo(&s, MAX_REQUEST_SIZE); @@ -1290,25 +1293,33 @@ communicator_write_loop(void* arg) while (true) { NeonCommunicatorRequest* req; - uint64 read_end_pos; /* Number of shards is decreased so this worker is not needed any more */ if (chan_no >= shard_map.num_shards * parallel_connections) - return NULL; - - read_end_pos = pg_atomic_read_u64(&chan->read_pos); - Assert(read_start_pos <= read_end_pos); - while (read_start_pos == read_end_pos) { - int events = WaitLatch(&chan->latch, WL_LATCH_SET|WL_POSTMASTER_DEATH, -1, WAIT_EVENT_NEON_PS_SEND); + neon_shard_log_cs(shard_no, LOG, "Shard %d is not online any more (num_shards=%d)", (int)shard_no, (int)shard_map.num_shards); + return NULL; + } + while (true) + { + int events; + uint64 read_end_pos = pg_atomic_read_u64(&chan->read_pos); + Assert(read_start_pos <= read_end_pos); + if (read_start_pos < read_end_pos) + { + break; + } + events = WaitLatch(&chan->latch, WL_LATCH_SET|WL_POSTMASTER_DEATH, -1, WAIT_EVENT_NEON_PS_SEND); if (events & WL_POSTMASTER_DEATH) return NULL; + ResetLatch(&chan->latch); } + elog(LOG, "Communicator %d receive request at %ld", (int)chan_no, (long)read_start_pos); req = &chan->requests[read_start_pos++ % ring_size]; nm_pack_request(&s, &req->hdr); Assert(s.maxlen == MAX_REQUEST_SIZE); /* string buffer was not reallocated */ - pageserver_send(chan_no, &s); req->hdr.u.reqid = 0; /* mark requests as processed */ + pageserver_send(chan_no, &s); } } @@ -1322,6 +1333,7 @@ communicator_read_loop(void* arg) int64 value = 0; size_t chan_no = (size_t)arg; int shard_no = CHAN_TO_SHARD(chan_no); + bool notify_backend = false; while (true) { @@ -1336,6 +1348,7 @@ communicator_read_loop(void* arg) pg_usleep(RECEIVER_RETRY_DELAY_USEC); continue; } + notify_backend = true; switch (resp->tag) { case T_NeonExistsResponse: @@ -1354,8 +1367,7 @@ communicator_read_loop(void* arg) { /* result of prefetch */ (void) lfc_prefetch(page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno, page_resp->page, resp->not_modified_since); - free(resp); - continue; /* should not notify backend */ + notify_backend = false; } else { @@ -1365,12 +1377,20 @@ communicator_read_loop(void* arg) InitBufferTag(&tag, &page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno); if (!BufferTagsEqual(&buf_desc->tag, &tag)) { - neon_shard_log_cs(shard_no, PANIC, "Get page request {rel=%u/%u/%u.%u block=%u} referecing wrpng buffer {rel=%u/%u/%u.%u block=%u}", + /* + * It can happen that backend was terminated before response was received fro page server. + * So doesn't treate this as error, just log and ignore response. + */ + neon_shard_log_cs(shard_no, LOG, "Get page request {rel=%u/%u/%u.%u block=%u} referencing wrong buffer {rel=%u/%u/%u.%u block=%u}", RelFileInfoFmt(page_resp->req.rinfo), page_resp->req.forknum, page_resp->req.blkno, RelFileInfoFmt(BufTagGetNRelFileInfo(buf_desc->tag)), buf_desc->tag.forkNum, buf_desc->tag.blockNum); + notify_backend = false; + } + else + { + /* Copy page content to shared buffer */ + memcpy(BufferGetBlock(resp->u.recepient.bufid), page_resp->page, BLCKSZ); } - /* Copy page content to shared buffer */ - memcpy(BufferGetBlock(resp->u.recepient.bufid), page_resp->page, BLCKSZ); } break; } @@ -1380,11 +1400,14 @@ communicator_read_loop(void* arg) default: break; } - responses[resp->u.recepient.procno].value = value; - /* enforce write barrier before writing response code which server as received response indicator */ - pg_write_barrier(); - responses[resp->u.recepient.procno].tag = resp->tag; - SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno].procLatch); + if (notify_backend) + { + responses[resp->u.recepient.procno].value = value; + /* enforce write barrier before writing response code which is used as received response indicator */ + pg_write_barrier(); + responses[resp->u.recepient.procno].tag = resp->tag; + SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno].procLatch); + } free(resp); } } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 0570701644..cbef7eb3ee 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -301,6 +301,7 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, extern void lfc_init(void); extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, const void* buffer, XLogRecPtr lsn); +extern bool lfc_enabled(void); static inline bool diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index afc7a87fc7..c5b282af4d 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -337,7 +337,6 @@ nm_unpack_response(StringInfo s) case T_NeonDbSizeRequest: case T_NeonGetSlruSegmentRequest: default: - neon_log(ERROR, "unexpected neon message tag 0x%02x", tag); break; } @@ -1370,24 +1369,27 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - - neon_get_request_lsns(InfoFromSMgrRel(reln), - forknum, blocknum, - request_lsns, nblocks, NULL); - for (int i = 0; i < nblocks; i++) + /* Prefetch result will be placed in LFC, so no need to send prefetch requests if LFC is disabled */ + if (lfc_enabled()) { - if (!lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum + i)) + neon_get_request_lsns(InfoFromSMgrRel(reln), + forknum, blocknum, + request_lsns, nblocks, NULL); + for (int i = 0; i < nblocks; i++) { - NeonCommunicatorRequest request = { - .page.hdr.tag = T_NeonGetPageRequest, - .page.hdr.u.recepient.bufid = InvalidBuffer, - .page.hdr.lsn = request_lsns[i].request_lsn, - .page.hdr.not_modified_since = request_lsns[i].not_modified_since, - .page.rinfo = InfoFromSMgrRel(reln), - .page.forknum = forknum, - .page.blkno = blocknum + i, - }; - (void)communicator_send_request(get_shard_number(InfoFromSMgrRel(reln), blocknum + i), &request); + if (!lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum + i)) + { + NeonCommunicatorRequest request = { + .page.hdr.tag = T_NeonGetPageRequest, + .page.hdr.u.recepient.bufid = InvalidBuffer, + .page.hdr.lsn = request_lsns[i].request_lsn, + .page.hdr.not_modified_since = request_lsns[i].not_modified_since, + .page.rinfo = InfoFromSMgrRel(reln), + .page.forknum = forknum, + .page.blkno = blocknum + i, + }; + (void)communicator_send_request(get_shard_number(InfoFromSMgrRel(reln), blocknum + i), &request); + } } } return false; @@ -1415,7 +1417,9 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence); } - if (!lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum)) + /* Prefetch result will be placed in LFC, so no need to send prefetch requests if LFC is disabled */ + if (lfc_enabled() + && !lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum)) { neon_request_lsns request_lsns; neon_get_request_lsns(InfoFromSMgrRel(reln),