Use pthread cond instead of Postgres latch for notification of coordinator

This commit is contained in:
Konstantin Knizhnik
2025-02-18 19:19:32 +02:00
parent 914cc529ba
commit 426101e38f
2 changed files with 87 additions and 60 deletions

View File

@@ -73,6 +73,8 @@ static int stripe_size;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static bool am_communicator;
#define CHAN_TO_SHARD(chan_no) ((chan_no) / parallel_connections)
PGDLLEXPORT void CommunicatorMain(Datum main_arg);
@@ -89,7 +91,7 @@ PGDLLEXPORT void CommunicatorMain(Datum main_arg);
typedef struct
{
char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE];
size_t num_shards;
size_t n_shards;
} ShardMap;
static ShardMap shard_map;
@@ -99,9 +101,6 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL;
#endif
static shmem_startup_hook_type prev_shmem_startup_hook;
static NeonCommunicatorResponse* responses; /* for each backend */
static NeonCommunicatorChannel* channels;
#if PG_VERSION_NUM < 170000
int MyProcNumber;
#endif
@@ -151,6 +150,16 @@ typedef struct
static PageServer* page_servers;
typedef struct
{
size_t n_shards;
NeonCommunicatorChannel* channels;
NeonCommunicatorResponse* responses; /* for each backend */
} NeonCommunicatorSharedState;
static NeonCommunicatorSharedState* communicator;
static void pageserver_disconnect(int chan_no);
@@ -190,14 +199,12 @@ MaxNumberOfChannels(void)
static bool
ParseShardMap(const char *connstr, ShardMap *result)
{
const char *p;
int nshards = 0;
const char *p = connstr;
int n_shards = 0;
if (result)
memset(result, 0, sizeof(ShardMap));
p = connstr;
nshards = 0;
for (;;)
{
const char *sep;
@@ -209,7 +216,7 @@ ParseShardMap(const char *connstr, ShardMap *result)
if (connstr_len == 0 && sep == NULL)
break; /* ignore trailing comma */
if (nshards >= MAX_SHARDS)
if (n_shards >= MAX_SHARDS)
{
neon_log(LOG, "Too many shards");
return false;
@@ -221,48 +228,39 @@ ParseShardMap(const char *connstr, ShardMap *result)
}
if (result)
{
memcpy(result->connstring[nshards], p, connstr_len);
result->connstring[nshards][connstr_len] = '\0';
memcpy(result->connstring[n_shards], p, connstr_len);
result->connstring[n_shards][connstr_len] = '\0';
}
nshards++;
n_shards++;
if (sep == NULL)
break;
p = sep + 1;
}
if (result)
result->num_shards = nshards;
communicator->n_shards = result->n_shards = n_shards;
return true;
}
static bool
IsCommunicatorProcess(void)
{
return MyBgworkerEntry && strcmp(MyBgworkerEntry->bgw_function_name, "CommunicatorMain") == 0;
}
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
char *p = *newval;
return !IsCommunicatorProcess() || ParseShardMap(p, NULL);
return ParseShardMap(*newval, NULL);
}
static void
AssignPageserverConnstring(const char *newval, void *extra)
{
ShardMap shard_map;
size_t old_num_shards;
size_t old_n_shards;
/*
* Only communicator background worker estblish connections with page server and need this information
*/
if (!IsCommunicatorProcess())
if (!am_communicator)
return;
old_num_shards = shard_map.num_shards;
old_n_shards = shard_map.n_shards;
if (!ParseShardMap(newval, &shard_map))
{
@@ -279,7 +277,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
}
/* Force to reestablish connection with old shards */
for (size_t i = 0; i < old_num_shards * parallel_connections; i++)
for (size_t i = 0; i < old_n_shards * parallel_connections; i++)
{
if (page_servers[i].state == PS_Connected)
{
@@ -289,7 +287,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
}
/* Start workers for new shards */
for (size_t i = old_num_shards * parallel_connections; i < shard_map.num_shards * parallel_connections; i++)
for (size_t i = old_n_shards * parallel_connections; i < shard_map.n_shards * parallel_connections; i++)
{
pthread_t reader, writer;
void* chan_no = (void*)i;
@@ -311,7 +309,7 @@ get_shard_number(NRelFileInfo rinfo, BlockNumber blocknum)
hash = hash_combine(hash, murmurhash32(blocknum / stripe_size));
#endif
return hash % shard_map.num_shards;
return hash % communicator->n_shards;
}
/*
@@ -336,7 +334,7 @@ cleanup_and_disconnect(PageServer *ps)
ps->state = PS_Disconnected;
pthread_mutex_lock(&mutex);
pthread_mutex_unlock(&mutex);
}
/*
@@ -971,7 +969,8 @@ RequestBufferSize(void)
static Size
CommunicatorShmemSize(void)
{
return RequestBufferSize() * MaxNumberOfChannels() * sizeof(NeonCommunicatorRequest)
return sizeof(NeonCommunicatorSharedState)
+ RequestBufferSize() * MaxNumberOfChannels() * sizeof(NeonCommunicatorRequest)
+ MaxNumberOfChannels() * sizeof(NeonCommunicatorChannel)
+ sizeof(NeonCommunicatorResponse) * PROCARRAY_MAXPROCS;
}
@@ -992,24 +991,36 @@ PagestoreShmemInit(void)
#endif
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
channels = ShmemInitStruct("communicator shared state",
CommunicatorShmemSize(),
&found);
communicator = ShmemInitStruct("communicator shared state",
CommunicatorShmemSize(),
&found);
if (!found)
{
size_t n_channels = MaxNumberOfChannels();
NeonCommunicatorRequest* requests = (NeonCommunicatorRequest*)(channels + n_channels);
NeonCommunicatorRequest* requests;
pthread_condattr_t attrcond;
pthread_mutexattr_t attrmutex;
pthread_condattr_init(&attrcond);
pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
pthread_mutexattr_init(&attrmutex);
pthread_mutexattr_setpshared(&attrmutex, PTHREAD_PROCESS_SHARED);
communicator->channels = (NeonCommunicatorChannel*)(communicator + 1);
requests = (NeonCommunicatorRequest*)(communicator->channels + n_channels);
for (size_t i = 0; i < n_channels; i++)
{
NeonCommunicatorChannel* chan = &channels[i];
NeonCommunicatorChannel* chan = &communicator->channels[i];
pg_atomic_init_u64(&chan->write_pos, 0);
pg_atomic_init_u64(&chan->read_pos, 0);
InitLatch(&chan->latch);
pthread_cond_init(&chan->cond, &attrcond);
pthread_mutex_init(&chan->mutex, &attrmutex);
chan->requests = requests;
requests += RequestBufferSize();
}
responses = (NeonCommunicatorResponse*)requests;
communicator->responses = (NeonCommunicatorResponse*)requests;
communicator->n_shards = shard_map.n_shards;
}
NeonPerfCountersShmemInit();
@@ -1061,19 +1072,22 @@ void
communicator_send_request(int shard, NeonCommunicatorRequest* req)
{
/* bind backend to the particular channel */
NeonCommunicatorChannel* chan = &channels[shard * parallel_connections + (MyProcNumber % parallel_connections)];
NeonCommunicatorChannel* chan = &communicator->channels[shard * parallel_connections + (MyProcNumber % parallel_connections)];
size_t ring_size = RequestBufferSize();
uint64 write_pos = pg_atomic_fetch_add_u64(&chan->write_pos, 1); /* reserve write position */
size_t ring_pos = (size_t)(write_pos % ring_size);
uint64 read_pos;
Assert(req->hdr.u.reqid == 0); /* ring overflow should not happen */
/* ring overflow should not happen */
Assert(chan->requests[ring_pos].hdr.u.reqid == 0);
req->hdr.u.recepient.procno = MyProcNumber;
/* copy request */
chan->requests[(size_t)(write_pos % ring_size)] = *req;
chan->requests[ring_pos] = *req;
/* will be overwritten with response code when request will be processed */
responses[MyProcNumber].tag = req->hdr.tag;
communicator->responses[MyProcNumber].tag = req->hdr.tag;
/* enforce memory battier before pinging communicator */
pg_write_barrier();
@@ -1083,7 +1097,7 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req)
read_pos = write_pos;
} while (!pg_atomic_compare_exchange_u64(&chan->read_pos, &read_pos, write_pos+1));
SetLatch(&chan->latch);
pthread_cond_signal(&chan->cond);
}
/*
@@ -1094,7 +1108,7 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req)
int64
communicator_receive_response(void)
{
while (responses[MyProcNumber].tag <= T_NeonTestRequest) /* response not yet received */
while (communicator->responses[MyProcNumber].tag <= T_NeonTestRequest) /* response not yet received */
{
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
@@ -1102,11 +1116,12 @@ communicator_receive_response(void)
WAIT_EVENT_NEON_PS_READ);
ResetLatch(MyLatch);
}
if (responses[MyProcNumber].tag == T_NeonErrorResponse)
if (communicator->responses[MyProcNumber].tag == T_NeonErrorResponse)
{
elog(ERROR, "Request failed"); /* detailed error message is printed by communicator */
}
return responses[MyProcNumber].value;
elog(LOG, "Backend %d receive response %d from communicator", MyProcNumber, communicator->responses[MyProcNumber].tag);
return communicator->responses[MyProcNumber].value;
}
/*
@@ -1120,15 +1135,19 @@ communicator_request(int shard, NeonCommunicatorRequest* req)
}
void
PGDLLEXPORT void
CommunicatorMain(Datum main_arg)
{
am_communicator = true;
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
BackgroundWorkerUnblockSignals();
AssignPageserverConnstring(page_server_connstring, NULL);
while (!ShutdownRequestPending)
{
(void) WaitLatch(MyLatch,
@@ -1137,6 +1156,11 @@ CommunicatorMain(Datum main_arg)
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
}
}
@@ -1284,7 +1308,7 @@ communicator_write_loop(void* arg)
uint64 read_start_pos = 0;
size_t chan_no = (size_t)arg;
int shard_no = CHAN_TO_SHARD(chan_no);
NeonCommunicatorChannel* chan = &channels[chan_no];
NeonCommunicatorChannel* chan = &communicator->channels[chan_no];
size_t ring_size = RequestBufferSize();
StringInfoData s;
@@ -1295,29 +1319,29 @@ communicator_write_loop(void* arg)
NeonCommunicatorRequest* req;
/* Number of shards is decreased so this worker is not needed any more */
if (chan_no >= shard_map.num_shards * parallel_connections)
if (chan_no >= shard_map.n_shards * parallel_connections)
{
neon_shard_log_cs(shard_no, LOG, "Shard %d is not online any more (num_shards=%d)", (int)shard_no, (int)shard_map.num_shards);
neon_shard_log_cs(shard_no, LOG, "Shard %d is not online any more (n_shards=%d)", (int)shard_no, (int)shard_map.n_shards);
return NULL;
}
while (true)
{
int events;
uint64 read_end_pos = pg_atomic_read_u64(&chan->read_pos);
Assert(read_start_pos <= read_end_pos);
if (read_start_pos < read_end_pos)
{
break;
}
events = WaitLatch(&chan->latch, WL_LATCH_SET|WL_POSTMASTER_DEATH, -1, WAIT_EVENT_NEON_PS_SEND);
if (events & WL_POSTMASTER_DEATH)
return NULL;
ResetLatch(&chan->latch);
pthread_mutex_lock(&chan->mutex);
pthread_cond_wait(&chan->cond, &chan->mutex);
pthread_mutex_unlock(&chan->mutex);
if (ShutdownRequestPending)
return NULL;
}
elog(LOG, "Communicator %d receive request at %ld", (int)chan_no, (long)read_start_pos);
req = &chan->requests[read_start_pos++ % ring_size];
nm_pack_request(&s, &req->hdr);
Assert(s.maxlen == MAX_REQUEST_SIZE); /* string buffer was not reallocated */
elog(LOG, "Send request %d from %d to PS", req->hdr.tag, req->hdr.u.recepient.procno);
req->hdr.u.reqid = 0; /* mark requests as processed */
pageserver_send(chan_no, &s);
}
@@ -1338,7 +1362,7 @@ communicator_read_loop(void* arg)
while (true)
{
/* Number of shards is decreased */
if (chan_no >= shard_map.num_shards * parallel_connections)
if (chan_no >= shard_map.n_shards * parallel_connections)
return NULL;
resp = pageserver_receive(chan_no);
@@ -1348,6 +1372,7 @@ communicator_read_loop(void* arg)
pg_usleep(RECEIVER_RETRY_DELAY_USEC);
continue;
}
elog(LOG, "Receive response %d from PS to %d channel %d", resp->tag, resp->u.recepient.procno, (int)chan_no);
notify_backend = true;
switch (resp->tag)
{
@@ -1366,6 +1391,7 @@ communicator_read_loop(void* arg)
if (resp->u.recepient.bufid == InvalidBuffer)
{
/* result of prefetch */
Assert(false);
(void) lfc_prefetch(page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno, page_resp->page, resp->not_modified_since);
notify_backend = false;
}
@@ -1402,10 +1428,10 @@ communicator_read_loop(void* arg)
}
if (notify_backend)
{
responses[resp->u.recepient.procno].value = value;
communicator->responses[resp->u.recepient.procno].value = value;
/* enforce write barrier before writing response code which is used as received response indicator */
pg_write_barrier();
responses[resp->u.recepient.procno].tag = resp->tag;
communicator->responses[resp->u.recepient.procno].tag = resp->tag;
SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno].procLatch);
}
free(resp);

View File

@@ -167,7 +167,8 @@ typedef struct
{
pg_atomic_uint64 write_pos;
pg_atomic_uint64 read_pos;
Latch latch;
pthread_mutex_t mutex;
pthread_cond_t cond;
NeonCommunicatorRequest* requests;
} NeonCommunicatorChannel;