diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 9e1fc7f116..5ff53a2327 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -73,6 +73,8 @@ static int stripe_size; static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static bool am_communicator; + #define CHAN_TO_SHARD(chan_no) ((chan_no) / parallel_connections) PGDLLEXPORT void CommunicatorMain(Datum main_arg); @@ -89,7 +91,7 @@ PGDLLEXPORT void CommunicatorMain(Datum main_arg); typedef struct { char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE]; - size_t num_shards; + size_t n_shards; } ShardMap; static ShardMap shard_map; @@ -99,9 +101,6 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL; #endif static shmem_startup_hook_type prev_shmem_startup_hook; -static NeonCommunicatorResponse* responses; /* for each backend */ -static NeonCommunicatorChannel* channels; - #if PG_VERSION_NUM < 170000 int MyProcNumber; #endif @@ -151,6 +150,16 @@ typedef struct static PageServer* page_servers; + +typedef struct +{ + size_t n_shards; + NeonCommunicatorChannel* channels; + NeonCommunicatorResponse* responses; /* for each backend */ +} NeonCommunicatorSharedState; + +static NeonCommunicatorSharedState* communicator; + static void pageserver_disconnect(int chan_no); @@ -190,14 +199,12 @@ MaxNumberOfChannels(void) static bool ParseShardMap(const char *connstr, ShardMap *result) { - const char *p; - int nshards = 0; + const char *p = connstr; + int n_shards = 0; if (result) memset(result, 0, sizeof(ShardMap)); - p = connstr; - nshards = 0; for (;;) { const char *sep; @@ -209,7 +216,7 @@ ParseShardMap(const char *connstr, ShardMap *result) if (connstr_len == 0 && sep == NULL) break; /* ignore trailing comma */ - if (nshards >= MAX_SHARDS) + if (n_shards >= MAX_SHARDS) { neon_log(LOG, "Too many shards"); return false; @@ -221,48 +228,39 @@ ParseShardMap(const char *connstr, ShardMap *result) } if (result) { - memcpy(result->connstring[nshards], p, connstr_len); - result->connstring[nshards][connstr_len] = '\0'; + memcpy(result->connstring[n_shards], p, connstr_len); + result->connstring[n_shards][connstr_len] = '\0'; } - nshards++; + n_shards++; if (sep == NULL) break; p = sep + 1; } if (result) - result->num_shards = nshards; + communicator->n_shards = result->n_shards = n_shards; return true; } -static bool -IsCommunicatorProcess(void) -{ - return MyBgworkerEntry && strcmp(MyBgworkerEntry->bgw_function_name, "CommunicatorMain") == 0; -} - static bool CheckPageserverConnstring(char **newval, void **extra, GucSource source) { - char *p = *newval; - - return !IsCommunicatorProcess() || ParseShardMap(p, NULL); + return ParseShardMap(*newval, NULL); } static void AssignPageserverConnstring(const char *newval, void *extra) { - ShardMap shard_map; - size_t old_num_shards; + size_t old_n_shards; /* * Only communicator background worker estblish connections with page server and need this information */ - if (!IsCommunicatorProcess()) + if (!am_communicator) return; - old_num_shards = shard_map.num_shards; + old_n_shards = shard_map.n_shards; if (!ParseShardMap(newval, &shard_map)) { @@ -279,7 +277,7 @@ AssignPageserverConnstring(const char *newval, void *extra) } /* Force to reestablish connection with old shards */ - for (size_t i = 0; i < old_num_shards * parallel_connections; i++) + for (size_t i = 0; i < old_n_shards * parallel_connections; i++) { if (page_servers[i].state == PS_Connected) { @@ -289,7 +287,7 @@ AssignPageserverConnstring(const char *newval, void *extra) } /* Start workers for new shards */ - for (size_t i = old_num_shards * parallel_connections; i < shard_map.num_shards * parallel_connections; i++) + for (size_t i = old_n_shards * parallel_connections; i < shard_map.n_shards * parallel_connections; i++) { pthread_t reader, writer; void* chan_no = (void*)i; @@ -311,7 +309,7 @@ get_shard_number(NRelFileInfo rinfo, BlockNumber blocknum) hash = hash_combine(hash, murmurhash32(blocknum / stripe_size)); #endif - return hash % shard_map.num_shards; + return hash % communicator->n_shards; } /* @@ -336,7 +334,7 @@ cleanup_and_disconnect(PageServer *ps) ps->state = PS_Disconnected; - pthread_mutex_lock(&mutex); + pthread_mutex_unlock(&mutex); } /* @@ -971,7 +969,8 @@ RequestBufferSize(void) static Size CommunicatorShmemSize(void) { - return RequestBufferSize() * MaxNumberOfChannels() * sizeof(NeonCommunicatorRequest) + return sizeof(NeonCommunicatorSharedState) + + RequestBufferSize() * MaxNumberOfChannels() * sizeof(NeonCommunicatorRequest) + MaxNumberOfChannels() * sizeof(NeonCommunicatorChannel) + sizeof(NeonCommunicatorResponse) * PROCARRAY_MAXPROCS; } @@ -992,24 +991,36 @@ PagestoreShmemInit(void) #endif LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - channels = ShmemInitStruct("communicator shared state", - CommunicatorShmemSize(), - &found); + communicator = ShmemInitStruct("communicator shared state", + CommunicatorShmemSize(), + &found); if (!found) { size_t n_channels = MaxNumberOfChannels(); - NeonCommunicatorRequest* requests = (NeonCommunicatorRequest*)(channels + n_channels); + NeonCommunicatorRequest* requests; + pthread_condattr_t attrcond; + pthread_mutexattr_t attrmutex; + pthread_condattr_init(&attrcond); + pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED); + + pthread_mutexattr_init(&attrmutex); + pthread_mutexattr_setpshared(&attrmutex, PTHREAD_PROCESS_SHARED); + + communicator->channels = (NeonCommunicatorChannel*)(communicator + 1); + requests = (NeonCommunicatorRequest*)(communicator->channels + n_channels); for (size_t i = 0; i < n_channels; i++) { - NeonCommunicatorChannel* chan = &channels[i]; + NeonCommunicatorChannel* chan = &communicator->channels[i]; pg_atomic_init_u64(&chan->write_pos, 0); pg_atomic_init_u64(&chan->read_pos, 0); - InitLatch(&chan->latch); + pthread_cond_init(&chan->cond, &attrcond); + pthread_mutex_init(&chan->mutex, &attrmutex); chan->requests = requests; requests += RequestBufferSize(); } - responses = (NeonCommunicatorResponse*)requests; + communicator->responses = (NeonCommunicatorResponse*)requests; + communicator->n_shards = shard_map.n_shards; } NeonPerfCountersShmemInit(); @@ -1061,19 +1072,22 @@ void communicator_send_request(int shard, NeonCommunicatorRequest* req) { /* bind backend to the particular channel */ - NeonCommunicatorChannel* chan = &channels[shard * parallel_connections + (MyProcNumber % parallel_connections)]; + NeonCommunicatorChannel* chan = &communicator->channels[shard * parallel_connections + (MyProcNumber % parallel_connections)]; size_t ring_size = RequestBufferSize(); uint64 write_pos = pg_atomic_fetch_add_u64(&chan->write_pos, 1); /* reserve write position */ + size_t ring_pos = (size_t)(write_pos % ring_size); uint64 read_pos; - Assert(req->hdr.u.reqid == 0); /* ring overflow should not happen */ + /* ring overflow should not happen */ + Assert(chan->requests[ring_pos].hdr.u.reqid == 0); + req->hdr.u.recepient.procno = MyProcNumber; /* copy request */ - chan->requests[(size_t)(write_pos % ring_size)] = *req; + chan->requests[ring_pos] = *req; /* will be overwritten with response code when request will be processed */ - responses[MyProcNumber].tag = req->hdr.tag; + communicator->responses[MyProcNumber].tag = req->hdr.tag; /* enforce memory battier before pinging communicator */ pg_write_barrier(); @@ -1083,7 +1097,7 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req) read_pos = write_pos; } while (!pg_atomic_compare_exchange_u64(&chan->read_pos, &read_pos, write_pos+1)); - SetLatch(&chan->latch); + pthread_cond_signal(&chan->cond); } /* @@ -1094,7 +1108,7 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req) int64 communicator_receive_response(void) { - while (responses[MyProcNumber].tag <= T_NeonTestRequest) /* response not yet received */ + while (communicator->responses[MyProcNumber].tag <= T_NeonTestRequest) /* response not yet received */ { (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, @@ -1102,11 +1116,12 @@ communicator_receive_response(void) WAIT_EVENT_NEON_PS_READ); ResetLatch(MyLatch); } - if (responses[MyProcNumber].tag == T_NeonErrorResponse) + if (communicator->responses[MyProcNumber].tag == T_NeonErrorResponse) { elog(ERROR, "Request failed"); /* detailed error message is printed by communicator */ } - return responses[MyProcNumber].value; + elog(LOG, "Backend %d receive response %d from communicator", MyProcNumber, communicator->responses[MyProcNumber].tag); + return communicator->responses[MyProcNumber].value; } /* @@ -1120,15 +1135,19 @@ communicator_request(int shard, NeonCommunicatorRequest* req) } -void +PGDLLEXPORT void CommunicatorMain(Datum main_arg) { + am_communicator = true; + /* Establish signal handlers. */ pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGTERM, SignalHandlerForShutdownRequest); BackgroundWorkerUnblockSignals(); + AssignPageserverConnstring(page_server_connstring, NULL); + while (!ShutdownRequestPending) { (void) WaitLatch(MyLatch, @@ -1137,6 +1156,11 @@ CommunicatorMain(Datum main_arg) PG_WAIT_EXTENSION); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } } } @@ -1284,7 +1308,7 @@ communicator_write_loop(void* arg) uint64 read_start_pos = 0; size_t chan_no = (size_t)arg; int shard_no = CHAN_TO_SHARD(chan_no); - NeonCommunicatorChannel* chan = &channels[chan_no]; + NeonCommunicatorChannel* chan = &communicator->channels[chan_no]; size_t ring_size = RequestBufferSize(); StringInfoData s; @@ -1295,29 +1319,29 @@ communicator_write_loop(void* arg) NeonCommunicatorRequest* req; /* Number of shards is decreased so this worker is not needed any more */ - if (chan_no >= shard_map.num_shards * parallel_connections) + if (chan_no >= shard_map.n_shards * parallel_connections) { - neon_shard_log_cs(shard_no, LOG, "Shard %d is not online any more (num_shards=%d)", (int)shard_no, (int)shard_map.num_shards); + neon_shard_log_cs(shard_no, LOG, "Shard %d is not online any more (n_shards=%d)", (int)shard_no, (int)shard_map.n_shards); return NULL; } while (true) { - int events; uint64 read_end_pos = pg_atomic_read_u64(&chan->read_pos); Assert(read_start_pos <= read_end_pos); if (read_start_pos < read_end_pos) { break; } - events = WaitLatch(&chan->latch, WL_LATCH_SET|WL_POSTMASTER_DEATH, -1, WAIT_EVENT_NEON_PS_SEND); - if (events & WL_POSTMASTER_DEATH) - return NULL; - ResetLatch(&chan->latch); + pthread_mutex_lock(&chan->mutex); + pthread_cond_wait(&chan->cond, &chan->mutex); + pthread_mutex_unlock(&chan->mutex); + if (ShutdownRequestPending) + return NULL; } - elog(LOG, "Communicator %d receive request at %ld", (int)chan_no, (long)read_start_pos); req = &chan->requests[read_start_pos++ % ring_size]; nm_pack_request(&s, &req->hdr); Assert(s.maxlen == MAX_REQUEST_SIZE); /* string buffer was not reallocated */ + elog(LOG, "Send request %d from %d to PS", req->hdr.tag, req->hdr.u.recepient.procno); req->hdr.u.reqid = 0; /* mark requests as processed */ pageserver_send(chan_no, &s); } @@ -1338,7 +1362,7 @@ communicator_read_loop(void* arg) while (true) { /* Number of shards is decreased */ - if (chan_no >= shard_map.num_shards * parallel_connections) + if (chan_no >= shard_map.n_shards * parallel_connections) return NULL; resp = pageserver_receive(chan_no); @@ -1348,6 +1372,7 @@ communicator_read_loop(void* arg) pg_usleep(RECEIVER_RETRY_DELAY_USEC); continue; } + elog(LOG, "Receive response %d from PS to %d channel %d", resp->tag, resp->u.recepient.procno, (int)chan_no); notify_backend = true; switch (resp->tag) { @@ -1366,6 +1391,7 @@ communicator_read_loop(void* arg) if (resp->u.recepient.bufid == InvalidBuffer) { /* result of prefetch */ + Assert(false); (void) lfc_prefetch(page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno, page_resp->page, resp->not_modified_since); notify_backend = false; } @@ -1402,10 +1428,10 @@ communicator_read_loop(void* arg) } if (notify_backend) { - responses[resp->u.recepient.procno].value = value; + communicator->responses[resp->u.recepient.procno].value = value; /* enforce write barrier before writing response code which is used as received response indicator */ pg_write_barrier(); - responses[resp->u.recepient.procno].tag = resp->tag; + communicator->responses[resp->u.recepient.procno].tag = resp->tag; SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno].procLatch); } free(resp); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index cbef7eb3ee..7119b1f3cc 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -167,7 +167,8 @@ typedef struct { pg_atomic_uint64 write_pos; pg_atomic_uint64 read_pos; - Latch latch; + pthread_mutex_t mutex; + pthread_cond_t cond; NeonCommunicatorRequest* requests; } NeonCommunicatorChannel;