Fix more bugs

This commit is contained in:
Konstantin Knizhnik
2025-02-18 10:18:13 +02:00
parent 06ebb42db5
commit 914cc529ba
4 changed files with 105 additions and 71 deletions

View File

@@ -269,6 +269,12 @@ lfc_ensure_opened(void)
return true;
}
bool
lfc_enabled(void)
{
return !lfc_maybe_disabled();
}
static void
lfc_shmem_startup(void)
{

View File

@@ -16,6 +16,7 @@
#include "postgres.h"
#include "access/twophase.h"
#include "access/xlog.h"
#include "common/hashfn.h"
#include "fmgr.h"
@@ -48,11 +49,13 @@
#define PageStoreTrace DEBUG5
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
#define RECEIVER_RETRY_DELAY_USEC 1000000
#define MAX_REQUEST_SIZE 1024
#define MAX_PS_QUERY_LENGTH 256
#define MIN_RECONNECT_INTERVAL_USEC 1000
#define MAX_RECONNECT_INTERVAL_USEC 1000000
#define RECEIVER_RETRY_DELAY_USEC 1000000
#define MAX_REQUEST_SIZE 1024
#define MAX_PS_QUERY_LENGTH 256
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
/* GUCs */
char *neon_timeline;
@@ -62,7 +65,7 @@ char *page_server_connstring;
char *neon_auth_token;
int max_prefetch_distance = 128;
int parallel_connections = 10;
int parallel_connections = 1;
int neon_protocol_version = 3;
@@ -72,7 +75,7 @@ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define CHAN_TO_SHARD(chan_no) ((chan_no) / parallel_connections)
void CommunicatorMain(Datum main_arg);
PGDLLEXPORT void CommunicatorMain(Datum main_arg);
/* Produce error message in critical section for thgread safety */
#define neon_shard_log_cs(shard_no, tag, fmt, ...) do { \
@@ -107,8 +110,6 @@ int MyProcNumber;
#define PG_IOV_MAX 32
#endif
static bool am_communicator = false;
typedef enum PSConnectionState {
PS_Disconnected, /* no connection yet */
PS_Connecting_Startup, /* connection starting up */
@@ -235,12 +236,18 @@ ParseShardMap(const char *connstr, ShardMap *result)
return true;
}
static bool
IsCommunicatorProcess(void)
{
return MyBgworkerEntry && strcmp(MyBgworkerEntry->bgw_function_name, "CommunicatorMain") == 0;
}
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
char *p = *newval;
return ParseShardMap(p, NULL);
return !IsCommunicatorProcess() || ParseShardMap(p, NULL);
}
static void
@@ -252,7 +259,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
/*
* Only communicator background worker estblish connections with page server and need this information
*/
if (!am_communicator)
if (!IsCommunicatorProcess())
return;
old_num_shards = shard_map.num_shards;
@@ -272,7 +279,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
}
/* Force to reestablish connection with old shards */
for (size_t i = 0; i < old_num_shards; i++)
for (size_t i = 0; i < old_num_shards * parallel_connections; i++)
{
if (page_servers[i].state == PS_Connected)
{
@@ -282,7 +289,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
}
/* Start workers for new shards */
for (size_t i = old_num_shards; i < shard_map.num_shards; i++)
for (size_t i = old_num_shards * parallel_connections; i < shard_map.num_shards * parallel_connections; i++)
{
pthread_t reader, writer;
void* chan_no = (void*)i;
@@ -909,20 +916,15 @@ pageserver_receive(int chan_no)
/* call_PQgetCopyData handles rc == 0 */
Assert(rc > 0);
/* FIXME: is it thread safe? */
PG_TRY();
{
resp_buff.len = rc;
resp_buff.cursor = 0;
resp = nm_unpack_response(&resp_buff);
PQfreemem(resp_buff.data);
}
PG_CATCH();
resp_buff.len = rc;
resp_buff.cursor = 0;
resp = nm_unpack_response(&resp_buff);
PQfreemem(resp_buff.data);
if (resp == NULL)
{
neon_shard_log_cs(shard_no, LOG, "pageserver_receive: disconnect due to failure while parsing response");
pageserver_disconnect(chan_no);
}
PG_END_TRY();
}
else if (rc == -1)
{
@@ -955,16 +957,15 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
/*
* Each backend can send up to max_prefetch_distance prefetch requests and one vectored read request.
* Backends are splitted between parallel conntions so each worker has tpo server at most MaxBackends / parallel_connections
* Backends are splitted between parallel conntions so each worker has tpo server at most PROCARRAY_MAXPROCS / parallel_connections
* backends.
*/
static Size
RequestBufferSize(void)
{
return (max_prefetch_distance + PG_IOV_MAX) * (MaxBackends + (parallel_connections - 1) / parallel_connections);
return (max_prefetch_distance + PG_IOV_MAX) * (PROCARRAY_MAXPROCS + (parallel_connections - 1) / parallel_connections);
}
static Size
@@ -972,7 +973,7 @@ CommunicatorShmemSize(void)
{
return RequestBufferSize() * MaxNumberOfChannels() * sizeof(NeonCommunicatorRequest)
+ MaxNumberOfChannels() * sizeof(NeonCommunicatorChannel)
+ sizeof(NeonCommunicatorResponse) * MaxBackends;
+ sizeof(NeonCommunicatorResponse) * PROCARRAY_MAXPROCS;
}
static Size
@@ -1062,7 +1063,7 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req)
/* bind backend to the particular channel */
NeonCommunicatorChannel* chan = &channels[shard * parallel_connections + (MyProcNumber % parallel_connections)];
size_t ring_size = RequestBufferSize();
uint64 write_pos = pg_atomic_add_fetch_u64(&chan->write_pos, 1); /* reserve write position */
uint64 write_pos = pg_atomic_fetch_add_u64(&chan->write_pos, 1); /* reserve write position */
uint64 read_pos;
Assert(req->hdr.u.reqid == 0); /* ring overflow should not happen */
@@ -1099,6 +1100,7 @@ communicator_receive_response(void)
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
-1L,
WAIT_EVENT_NEON_PS_READ);
ResetLatch(MyLatch);
}
if (responses[MyProcNumber].tag == T_NeonErrorResponse)
{
@@ -1121,8 +1123,6 @@ communicator_request(int shard, NeonCommunicatorRequest* req)
void
CommunicatorMain(Datum main_arg)
{
am_communicator = true;
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -1135,6 +1135,7 @@ CommunicatorMain(Datum main_arg)
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
-1L,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
}
@@ -1210,11 +1211,12 @@ pg_init_libpagestore(void)
GUC_UNIT_MB,
NULL, NULL, NULL);
/* FIXME: enforce that effective_io_concurrency and maintenance_io_concurrency can not be set larger than max_prefetch_distance */
DefineCustomIntVariable("neon.max_prefetch_distance",
"Maximal number of prefetch requests",
"effetive_io_concurrency and maintenance_io_concurrecy should not be larger than sthis value",
&max_prefetch_distance,
128, 16, 1024,
128, 0, 1024,
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
@@ -1223,7 +1225,7 @@ pg_init_libpagestore(void)
"number of connections to each shard",
NULL,
&parallel_connections,
10, 1, 16,
1, 1, 16,
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
@@ -1279,10 +1281,11 @@ allocStringInfo(StringInfo s, size_t size)
static void*
communicator_write_loop(void* arg)
{
uint64 read_start_pos = 0;
size_t chan_no = (size_t)arg;
uint64 read_start_pos = 0;
size_t chan_no = (size_t)arg;
int shard_no = CHAN_TO_SHARD(chan_no);
NeonCommunicatorChannel* chan = &channels[chan_no];
size_t ring_size = RequestBufferSize();
size_t ring_size = RequestBufferSize();
StringInfoData s;
allocStringInfo(&s, MAX_REQUEST_SIZE);
@@ -1290,25 +1293,33 @@ communicator_write_loop(void* arg)
while (true)
{
NeonCommunicatorRequest* req;
uint64 read_end_pos;
/* Number of shards is decreased so this worker is not needed any more */
if (chan_no >= shard_map.num_shards * parallel_connections)
return NULL;
read_end_pos = pg_atomic_read_u64(&chan->read_pos);
Assert(read_start_pos <= read_end_pos);
while (read_start_pos == read_end_pos)
{
int events = WaitLatch(&chan->latch, WL_LATCH_SET|WL_POSTMASTER_DEATH, -1, WAIT_EVENT_NEON_PS_SEND);
neon_shard_log_cs(shard_no, LOG, "Shard %d is not online any more (num_shards=%d)", (int)shard_no, (int)shard_map.num_shards);
return NULL;
}
while (true)
{
int events;
uint64 read_end_pos = pg_atomic_read_u64(&chan->read_pos);
Assert(read_start_pos <= read_end_pos);
if (read_start_pos < read_end_pos)
{
break;
}
events = WaitLatch(&chan->latch, WL_LATCH_SET|WL_POSTMASTER_DEATH, -1, WAIT_EVENT_NEON_PS_SEND);
if (events & WL_POSTMASTER_DEATH)
return NULL;
ResetLatch(&chan->latch);
}
elog(LOG, "Communicator %d receive request at %ld", (int)chan_no, (long)read_start_pos);
req = &chan->requests[read_start_pos++ % ring_size];
nm_pack_request(&s, &req->hdr);
Assert(s.maxlen == MAX_REQUEST_SIZE); /* string buffer was not reallocated */
pageserver_send(chan_no, &s);
req->hdr.u.reqid = 0; /* mark requests as processed */
pageserver_send(chan_no, &s);
}
}
@@ -1322,6 +1333,7 @@ communicator_read_loop(void* arg)
int64 value = 0;
size_t chan_no = (size_t)arg;
int shard_no = CHAN_TO_SHARD(chan_no);
bool notify_backend = false;
while (true)
{
@@ -1336,6 +1348,7 @@ communicator_read_loop(void* arg)
pg_usleep(RECEIVER_RETRY_DELAY_USEC);
continue;
}
notify_backend = true;
switch (resp->tag)
{
case T_NeonExistsResponse:
@@ -1354,8 +1367,7 @@ communicator_read_loop(void* arg)
{
/* result of prefetch */
(void) lfc_prefetch(page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno, page_resp->page, resp->not_modified_since);
free(resp);
continue; /* should not notify backend */
notify_backend = false;
}
else
{
@@ -1365,12 +1377,20 @@ communicator_read_loop(void* arg)
InitBufferTag(&tag, &page_resp->req.rinfo, page_resp->req.forknum, page_resp->req.blkno);
if (!BufferTagsEqual(&buf_desc->tag, &tag))
{
neon_shard_log_cs(shard_no, PANIC, "Get page request {rel=%u/%u/%u.%u block=%u} referecing wrpng buffer {rel=%u/%u/%u.%u block=%u}",
/*
* It can happen that backend was terminated before response was received fro page server.
* So doesn't treate this as error, just log and ignore response.
*/
neon_shard_log_cs(shard_no, LOG, "Get page request {rel=%u/%u/%u.%u block=%u} referencing wrong buffer {rel=%u/%u/%u.%u block=%u}",
RelFileInfoFmt(page_resp->req.rinfo), page_resp->req.forknum, page_resp->req.blkno,
RelFileInfoFmt(BufTagGetNRelFileInfo(buf_desc->tag)), buf_desc->tag.forkNum, buf_desc->tag.blockNum);
notify_backend = false;
}
else
{
/* Copy page content to shared buffer */
memcpy(BufferGetBlock(resp->u.recepient.bufid), page_resp->page, BLCKSZ);
}
/* Copy page content to shared buffer */
memcpy(BufferGetBlock(resp->u.recepient.bufid), page_resp->page, BLCKSZ);
}
break;
}
@@ -1380,11 +1400,14 @@ communicator_read_loop(void* arg)
default:
break;
}
responses[resp->u.recepient.procno].value = value;
/* enforce write barrier before writing response code which server as received response indicator */
pg_write_barrier();
responses[resp->u.recepient.procno].tag = resp->tag;
SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno].procLatch);
if (notify_backend)
{
responses[resp->u.recepient.procno].value = value;
/* enforce write barrier before writing response code which is used as received response indicator */
pg_write_barrier();
responses[resp->u.recepient.procno].tag = resp->tag;
SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno].procLatch);
}
free(resp);
}
}

View File

@@ -301,6 +301,7 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
extern bool lfc_enabled(void);
static inline bool

View File

@@ -337,7 +337,6 @@ nm_unpack_response(StringInfo s)
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
break;
}
@@ -1370,24 +1369,27 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
neon_get_request_lsns(InfoFromSMgrRel(reln),
forknum, blocknum,
request_lsns, nblocks, NULL);
for (int i = 0; i < nblocks; i++)
/* Prefetch result will be placed in LFC, so no need to send prefetch requests if LFC is disabled */
if (lfc_enabled())
{
if (!lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum + i))
neon_get_request_lsns(InfoFromSMgrRel(reln),
forknum, blocknum,
request_lsns, nblocks, NULL);
for (int i = 0; i < nblocks; i++)
{
NeonCommunicatorRequest request = {
.page.hdr.tag = T_NeonGetPageRequest,
.page.hdr.u.recepient.bufid = InvalidBuffer,
.page.hdr.lsn = request_lsns[i].request_lsn,
.page.hdr.not_modified_since = request_lsns[i].not_modified_since,
.page.rinfo = InfoFromSMgrRel(reln),
.page.forknum = forknum,
.page.blkno = blocknum + i,
};
(void)communicator_send_request(get_shard_number(InfoFromSMgrRel(reln), blocknum + i), &request);
if (!lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum + i))
{
NeonCommunicatorRequest request = {
.page.hdr.tag = T_NeonGetPageRequest,
.page.hdr.u.recepient.bufid = InvalidBuffer,
.page.hdr.lsn = request_lsns[i].request_lsn,
.page.hdr.not_modified_since = request_lsns[i].not_modified_since,
.page.rinfo = InfoFromSMgrRel(reln),
.page.forknum = forknum,
.page.blkno = blocknum + i,
};
(void)communicator_send_request(get_shard_number(InfoFromSMgrRel(reln), blocknum + i), &request);
}
}
}
return false;
@@ -1415,7 +1417,9 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
if (!lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum))
/* Prefetch result will be placed in LFC, so no need to send prefetch requests if LFC is disabled */
if (lfc_enabled()
&& !lfc_cache_contains(InfoFromSMgrRel(reln), forknum, blocknum))
{
neon_request_lsns request_lsns;
neon_get_request_lsns(InfoFromSMgrRel(reln),