First working version

This commit is contained in:
Konstantin Knizhnik
2025-02-19 16:59:07 +02:00
parent 426101e38f
commit bc07358034
2 changed files with 217 additions and 470 deletions

View File

@@ -13,6 +13,7 @@
*-------------------------------------------------------------------------
*/
#include <pthread.h>
#include <poll.h>
#include "postgres.h"
@@ -65,7 +66,7 @@ char *page_server_connstring;
char *neon_auth_token;
int max_prefetch_distance = 128;
int parallel_connections = 1;
int parallel_connections = 10;
int neon_protocol_version = 3;
@@ -111,10 +112,8 @@ int MyProcNumber;
typedef enum PSConnectionState {
PS_Disconnected, /* no connection yet */
PS_Connecting_Startup, /* connection starting up */
PS_Connecting_PageStream, /* negotiating pagestream */
PS_Connected, /* connected, pagestream established */
PS_Expired, /* connection should be reestablished */
PS_PendingDisconnect, /* connection should be reestablished */
} PSConnectionState;
/* This backend's per-shard connections */
@@ -138,14 +137,6 @@ typedef struct
/* request / response counters for debugging */
uint64 nrequests_sent;
uint64 nresponses_received;
/*---
* WaitEventSet containing:
* - WL_SOCKET_READABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_read;
} PageServer;
static PageServer* page_servers;
@@ -174,7 +165,7 @@ log_error_message(NeonErrorResponse* err)
{
int save_pid = MyProcPid;
pthread_mutex_lock(&mutex);
MyProcPid = ProcGlobal->allProcs[err->req.u.recepient.procno].pid;
MyProcPid = ProcGlobal->allProcs[err->req.u.recepient.procno-1].pid;
neon_log(LOG, "Server returns error for request %d: %s", err->req.tag, err->message);
MyProcPid = save_pid;
pthread_mutex_unlock(&mutex);
@@ -282,7 +273,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
if (page_servers[i].state == PS_Connected)
{
/* TODO: race condition */
page_servers[i].state = PS_Expired;
page_servers[i].state = PS_PendingDisconnect;
}
}
@@ -293,6 +284,8 @@ AssignPageserverConnstring(const char *newval, void *extra)
void* chan_no = (void*)i;
pthread_create(&writer, NULL, communicator_write_loop, chan_no);
pthread_create(&reader, NULL, communicator_read_loop, chan_no);
pthread_detach(reader);
pthread_detach(writer);
}
}
@@ -318,13 +311,6 @@ get_shard_number(NRelFileInfo rinfo, BlockNumber blocknum)
static void
cleanup_and_disconnect(PageServer *ps)
{
pthread_mutex_lock(&mutex);
if (ps->wes_read)
{
FreeWaitEventSet(ps->wes_read);
ps->wes_read = NULL;
}
if (ps->conn)
{
MyNeonCounters->pageserver_disconnects_total++;
@@ -333,8 +319,6 @@ cleanup_and_disconnect(PageServer *ps)
}
ps->state = PS_Disconnected;
pthread_mutex_unlock(&mutex);
}
/*
@@ -351,6 +335,90 @@ chomp(char const* in)
return strndup(in, n);
}
/*
* TODO: we have to defined this pq_send/pq_poll function to perform raw write to non-blocking socket.
* Is there and bette/portable/simpler way in :ostgres to do it?
*/
#if PG_VERSION_NUM < 170000
static int
PQsocketPoll(int sock, int forRead, int forWrite, time_t end_time)
{
struct pollfd input_fd;
int timeout_ms;
if (!forRead && !forWrite)
return 0;
input_fd.fd = sock;
input_fd.events = POLLERR;
input_fd.revents = 0;
if (forRead)
input_fd.events |= POLLIN;
if (forWrite)
input_fd.events |= POLLOUT;
/* Compute appropriate timeout interval */
if (end_time == ((time_t) -1))
timeout_ms = -1;
else
{
time_t now = time(NULL);
if (end_time > now)
timeout_ms = (end_time - now) * 1000;
else
timeout_ms = 0;
}
return poll(&input_fd, 1, timeout_ms);
}
#endif
static int
pq_poll(PGconn *conn, int forRead, int forWrite, time_t end_time)
{
int result;
do
{
result = PQsocketPoll(PQsocket(conn), forRead, forWrite, end_time);
}
while (result < 0 && errno == EINTR);
return result;
}
static ssize_t
pq_send(PGconn *conn, const void *ptr, size_t len)
{
size_t offs = 0;
while (offs < len)
{
ssize_t rc = send(PQsocket(conn), (char*)ptr + offs, len - offs, 0);
if (rc < 0)
{
/* Anything except EAGAIN/EWOULDBLOCK/EINTR is trouble */
switch (errno)
{
case EAGAIN:
break;
case EINTR:
continue;
default:
return rc;
}
rc = pq_poll(conn, false, true, -1);
if (rc < 0)
{
return rc;
}
} else {
offs += rc;
}
}
return offs;
}
/*
* Connect to a pageserver, or continue to try to connect if we're yet to
* complete the connection (e.g. due to receiving an earlier cancellation
@@ -367,415 +435,103 @@ pageserver_connect(int chan_no, int elevel)
char pagestream_query[MAX_PS_QUERY_LENGTH];
int shard_no = CHAN_TO_SHARD(chan_no);
char* connstr = shard_map.connstring[shard_no];
const char *keywords[3];
const char *values[3];
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
PGresult* res;
switch (ps->state)
{
case PS_Disconnected:
{
const char *keywords[3];
const char *values[3];
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
/* Make sure we start with a clean slate */
cleanup_and_disconnect(ps);
/* Make sure we start with a clean slate */
cleanup_and_disconnect(ps);
neon_shard_log_cs(shard_no, DEBUG5, "Connection state: Disconnected");
neon_shard_log_cs(shard_no, DEBUG5, "Connection state: Disconnected");
now = GetCurrentTimestamp();
us_since_last_attempt = (int64) (now - ps->last_reconnect_time);
ps->last_reconnect_time = now;
now = GetCurrentTimestamp();
us_since_last_attempt = (int64) (now - ps->last_reconnect_time);
ps->last_reconnect_time = now;
/*
* Make sure we don't do exponential backoff with a constant multiplier
* of 0 us, as that doesn't really do much for timeouts...
*
* cf. https://github.com/neondatabase/neon/issues/7897
*/
if (ps->delay_us == 0)
ps->delay_us = MIN_RECONNECT_INTERVAL_USEC;
/*
* If we did other tasks between reconnect attempts, then we won't
* need to wait as long as a full delay.
*/
if (us_since_last_attempt < ps->delay_us)
{
pg_usleep(ps->delay_us - us_since_last_attempt);
}
/* update the delay metric */
ps->delay_us = Min(ps->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC);
/*
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
* variable was set, use that as the password.
*
* The connection options are parsed in the order they're given, so when
* we set the password before the connection string, the connection string
* can override the password from the env variable. Seems useful, although
* we don't currently use that capability anywhere.
*/
keywords[0] = "dbname";
values[0] = connstr;
n_pgsql_params = 1;
if (neon_auth_token)
{
keywords[1] = "password";
values[1] = neon_auth_token;
n_pgsql_params++;
}
keywords[n_pgsql_params] = NULL;
values[n_pgsql_params] = NULL;
ps->conn = PQconnectStartParams(keywords, values, 1);
if (PQstatus(ps->conn) == CONNECTION_BAD)
{
char *msg = chomp(PQerrorMessage(ps->conn));
cleanup_and_disconnect(ps);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no),
errdetail_internal("%s", msg)));
free(msg);
return false;
}
ps->state = PS_Connecting_Startup;
}
/* FALLTHROUGH */
case PS_Connecting_Startup:
{
int ps_send_query_ret;
bool connected = false;
int poll_result = PGRES_POLLING_WRITING;
neon_shard_log_cs(shard_no, DEBUG5, "Connection state: Connecting_Startup");
do
{
switch (poll_result)
{
default: /* unknown/unused states are handled as a failed connection */
case PGRES_POLLING_FAILED:
{
char *pqerr = PQerrorMessage(ps->conn);
char *msg = NULL;
neon_shard_log_cs(shard_no, DEBUG5, "POLLING_FAILED");
if (pqerr)
msg = chomp(pqerr);
cleanup_and_disconnect(ps);
if (msg)
{
neon_shard_log_cs(shard_no, elevel,
"could not connect to pageserver: %s",
msg);
free(msg);
}
else
neon_shard_log_cs(shard_no, elevel,
"could not connect to pageserver");
return false;
}
case PGRES_POLLING_READING:
/* Sleep until there's something to do */
while (true)
{
int rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE,
PQsocket(ps->conn),
0,
WAIT_EVENT_NEON_PS_STARTING);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
}
if (rc & WL_SOCKET_READABLE)
break;
}
/* PQconnectPoll() handles the socket polling state updates */
break;
case PGRES_POLLING_WRITING:
/* Sleep until there's something to do */
while (true)
{
int rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE,
PQsocket(ps->conn),
0,
WAIT_EVENT_NEON_PS_STARTING);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
}
if (rc & WL_SOCKET_WRITEABLE)
break;
}
/* PQconnectPoll() handles the socket polling state updates */
break;
case PGRES_POLLING_OK:
neon_shard_log_cs(shard_no, DEBUG5, "POLLING_OK");
connected = true;
break;
}
poll_result = PQconnectPoll(ps->conn);
}
while (!connected);
/* No more polling needed; connection succeeded */
ps->last_connect_time = GetCurrentTimestamp();
/* Allocate wait event set in critical section */
pthread_mutex_lock(&mutex);
#if PG_MAJORVERSION_NUM >= 17
ps->wes_read = CreateWaitEventSet(NULL, 3);
#else
ps->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
#endif
AddWaitEventToSet(ps->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(ps->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(ps->wes_read, WL_SOCKET_READABLE, PQsocket(ps->conn), NULL, NULL);
pthread_mutex_unlock(&mutex);
switch (neon_protocol_version)
{
case 3:
snprintf(pagestream_query, sizeof pagestream_query, "pagestream_v3 %s %s", neon_tenant, neon_timeline);
break;
case 2:
snprintf(pagestream_query, sizeof pagestream_query, "pagestream_v2 %s %s", neon_tenant, neon_timeline);
break;
default:
neon_shard_log_cs(shard_no, ERROR, "unexpected neon_protocol_version %d", neon_protocol_version);
}
if (PQstatus(ps->conn) == CONNECTION_BAD)
{
char *msg = chomp(PQerrorMessage(ps->conn));
cleanup_and_disconnect(ps);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no),
errdetail_internal("%s", msg)));
free(msg);
return false;
}
ps_send_query_ret = PQsendQuery(ps->conn, pagestream_query);
if (ps_send_query_ret != 1)
{
cleanup_and_disconnect(ps);
neon_shard_log_cs(shard_no, elevel, "could not send pagestream command to pageserver");
return false;
}
ps->state = PS_Connecting_PageStream;
}
/* FALLTHROUGH */
case PS_Connecting_PageStream:
{
neon_shard_log_cs(shard_no, DEBUG5, "Connection state: Connecting_PageStream");
if (PQstatus(ps->conn) == CONNECTION_BAD)
{
char *msg = chomp(PQerrorMessage(ps->conn));
cleanup_and_disconnect(ps);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no),
errdetail_internal("%s", msg)));
free(msg);
return false;
}
while (PQisBusy(ps->conn))
{
WaitEvent event;
/* Sleep until there's something to do */
(void) WaitEventSetWait(ps->wes_read, -1L, &event, 1,
WAIT_EVENT_NEON_PS_CONFIGURING);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(ps->conn))
{
char *msg = chomp(PQerrorMessage(ps->conn));
cleanup_and_disconnect(ps);
neon_shard_log_cs(shard_no, elevel, "could not complete handshake with pageserver: %s",
msg);
free(msg);
return false;
}
}
}
ps->state = PS_Connected;
ps->nrequests_sent = 0;
ps->nresponses_received = 0;
}
/* FALLTHROUGH */
case PS_Connected:
/*
* We successfully connected. Future connections to this PageServer
* will do fast retries again, with exponential backoff.
*/
/*
* Make sure we don't do exponential backoff with a constant multiplier
* of 0 us, as that doesn't really do much for timeouts...
*
* cf. https://github.com/neondatabase/neon/issues/7897
*/
if (ps->delay_us == 0)
ps->delay_us = MIN_RECONNECT_INTERVAL_USEC;
neon_shard_log_cs(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
return true;
default:
neon_shard_log_cs(shard_no, ERROR, "libpagestore: invalid connection state %d", ps->state);
/*
* If we did other tasks between reconnect attempts, then we won't
* need to wait as long as a full delay.
*/
if (us_since_last_attempt < ps->delay_us)
{
pg_usleep(ps->delay_us - us_since_last_attempt);
}
/* This shouldn't be hit */
Assert(false);
}
/*
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(int chan_no, char **buffer)
{
int ret;
PageServer *ps = &page_servers[chan_no];
PGconn *pageserver_conn = ps->conn;
instr_time now,
start_ts,
since_start,
last_log_ts,
since_last_log;
bool logged = false;
int shard_no = CHAN_TO_SHARD(chan_no);
/* update the delay metric */
ps->delay_us = Min(ps->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC);
/*
* As a debugging aid, if we don't get a response for a long time, print a
* log message.
* Connect using the connection string we got from the
* neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment
* variable was set, use that as the password.
*
* 10 s is a very generous threshold, normally we expect a response in a
* few milliseconds. We have metrics to track latencies in normal ranges,
* but in the cases that take exceptionally long, it's useful to log the
* exact timestamps.
* The connection options are parsed in the order they're given, so when
* we set the password before the connection string, the connection string
* can override the password from the env variable. Seems useful, although
* we don't currently use that capability anywhere.
*/
#define LOG_INTERVAL_MS INT64CONST(10 * 1000)
keywords[0] = "dbname";
values[0] = connstr;
n_pgsql_params = 1;
INSTR_TIME_SET_CURRENT(now);
start_ts = last_log_ts = now;
INSTR_TIME_SET_ZERO(since_last_log);
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
if (ret == 0)
if (neon_auth_token)
{
WaitEvent event;
long timeout;
timeout = Max(0, LOG_INTERVAL_MS - INSTR_TIME_GET_MILLISEC(since_last_log));
/* Sleep until there's something to do */
(void) WaitEventSetWait(ps->wes_read, timeout, &event, 1,
WAIT_EVENT_NEON_PS_READ);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
char *msg = chomp(PQerrorMessage(pageserver_conn));
neon_shard_log_cs(shard_no, LOG, "could not get response from pageserver: %s", msg);
free(msg);
return -1;
}
}
/*
* Print a message to the log if a long time has passed with no
* response.
*/
INSTR_TIME_SET_CURRENT(now);
since_last_log = now;
INSTR_TIME_SUBTRACT(since_last_log, last_log_ts);
if (INSTR_TIME_GET_MILLISEC(since_last_log) >= LOG_INTERVAL_MS)
{
int sndbuf = -1;
int recvbuf = -1;
#ifdef __linux__
int socketfd;
#endif
since_start = now;
INSTR_TIME_SUBTRACT(since_start, start_ts);
#ifdef __linux__
/*
* get kernel's send and recv queue size via ioctl
* https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27
*/
socketfd = PQsocket(pageserver_conn);
if (socketfd != -1) {
int ioctl_err;
ioctl_err = ioctl(socketfd, SIOCOUTQ, &sndbuf);
if (ioctl_err!= 0) {
sndbuf = -errno;
}
ioctl_err = ioctl(socketfd, FIONREAD, &recvbuf);
if (ioctl_err != 0) {
recvbuf = -errno;
}
}
#endif
neon_shard_log_cs(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)",
INSTR_TIME_GET_DOUBLE(since_start),
ps->nrequests_sent, ps->nresponses_received, sndbuf, recvbuf);
last_log_ts = now;
logged = true;
}
goto retry;
keywords[1] = "password";
values[1] = neon_auth_token;
n_pgsql_params++;
}
/*
* If we logged earlier that the response is taking a long time, log
* another message when the response is finally received.
*/
if (logged)
{
INSTR_TIME_SET_CURRENT(now);
since_start = now;
INSTR_TIME_SUBTRACT(since_start, start_ts);
neon_shard_log_cs(shard_no, LOG, "received response from pageserver after %0.3f s",
INSTR_TIME_GET_DOUBLE(since_start));
}
keywords[n_pgsql_params] = NULL;
values[n_pgsql_params] = NULL;
return ret;
ps->conn = PQconnectdbParams(keywords, values, 1);
if (PQstatus(ps->conn) == CONNECTION_BAD)
{
char *msg = chomp(PQerrorMessage(ps->conn));
cleanup_and_disconnect(ps);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no),
errdetail_internal("%s", msg)));
free(msg);
return false;
}
ps->last_connect_time = GetCurrentTimestamp();
snprintf(pagestream_query, sizeof pagestream_query, "pagestream_v%d %s %s", neon_protocol_version, neon_tenant, neon_timeline);
res = PQexec(ps->conn, pagestream_query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
char *msg = chomp(PQerrorMessage(ps->conn));
cleanup_and_disconnect(ps);
ereport(elevel,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "[shard %d] could perform handshake with pageserver", chan_no),
errdetail_internal("%s", msg)));
free(msg);
return false;
}
PQclear(res);
ps->state = PS_Connected;
ps->nrequests_sent = 0;
ps->nresponses_received = 0;
ps->delay_us = MIN_RECONNECT_INTERVAL_USEC;
neon_shard_log_cs(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version);
return true;
}
/*
@@ -827,7 +583,7 @@ pageserver_send(int chan_no, StringInfo msg)
*/
if (ps->state != PS_Connected)
{
if (ps->state == PS_Expired)
if (ps->state == PS_PendingDisconnect)
{
neon_shard_log_cs(shard_no, LOG, "pageserver_send disconnect expired connection");
pageserver_disconnect(chan_no);
@@ -847,37 +603,17 @@ pageserver_send(int chan_no, StringInfo msg)
/*
* Send request.
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
*
* Note that this also will fail when the connection is in the
* PGRES_POLLING_WRITING state. It's kinda dirty to disconnect at this
* point, but on the grand scheme of things it's only a small issue.
* We can not use PQputCopyData because it tries to read input messages.
* So we have to do it "manually".
*/
ps->nrequests_sent++;
if (PQputCopyData(pageserver_conn, msg->data, msg->len) <= 0)
if (pq_send(pageserver_conn, msg->data, msg->len) < 0)
{
char *msg = chomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(chan_no);
neon_shard_log_cs(shard_no, LOG, "pageserver_send disconnected: failed to send page request (try to reconnect): %s", msg);
free(msg);
return false;
}
/*
* TODO: may be not flush each request?
*/
if (PQflush(pageserver_conn))
{
char *msg = chomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(chan_no);
neon_shard_log_cs(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg);
free(msg);
return false;
}
return true;
}
@@ -890,7 +626,7 @@ static NeonResponse *
pageserver_receive(int chan_no)
{
StringInfoData resp_buff;
NeonResponse *resp;
NeonResponse *resp = NULL;
PageServer *ps = &page_servers[chan_no];
PGconn *pageserver_conn = ps->conn;
int shard_no = CHAN_TO_SHARD(chan_no);
@@ -908,7 +644,8 @@ pageserver_receive(int chan_no)
Assert(pageserver_conn);
rc = call_PQgetCopyData(chan_no, &resp_buff.data);
resp_buff.data = NULL;
rc = PQgetCopyData(pageserver_conn, &resp_buff.data, false);
if (rc >= 0)
{
/* call_PQgetCopyData handles rc == 0 */
@@ -927,20 +664,18 @@ pageserver_receive(int chan_no)
else if (rc == -1)
{
neon_shard_log_cs(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", chomp(PQerrorMessage(pageserver_conn)));
pageserver_disconnect(chan_no);
resp = NULL;
ps->state = PS_PendingDisconnect;
}
else if (rc == -2)
{
char *msg = chomp(PQerrorMessage(pageserver_conn));
pageserver_disconnect(chan_no);
neon_shard_log_cs(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg);
ps->state = PS_PendingDisconnect;
neon_shard_log_cs(shard_no, LOG, "pageserver_receive disconnect: could not read COPY data: %s", msg);
}
else
{
pageserver_disconnect(chan_no);
neon_shard_log_cs(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
ps->state = PS_PendingDisconnect;
neon_shard_log_cs(shard_no, LOG, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
}
ps->nresponses_received++;
@@ -1081,7 +816,8 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req)
/* ring overflow should not happen */
Assert(chan->requests[ring_pos].hdr.u.reqid == 0);
req->hdr.u.recepient.procno = MyProcNumber;
req->hdr.u.recepient.procno = MyProcNumber + 1; /* make it non-zero */
Assert(req->hdr.u.reqid != 0);
/* copy request */
chan->requests[ring_pos] = *req;
@@ -1089,7 +825,7 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req)
/* will be overwritten with response code when request will be processed */
communicator->responses[MyProcNumber].tag = req->hdr.tag;
/* enforce memory battier before pinging communicator */
/* enforce memory barrier before pinging communicator */
pg_write_barrier();
/* advance read-up-to position */
@@ -1120,7 +856,6 @@ communicator_receive_response(void)
{
elog(ERROR, "Request failed"); /* detailed error message is printed by communicator */
}
elog(LOG, "Backend %d receive response %d from communicator", MyProcNumber, communicator->responses[MyProcNumber].tag);
return communicator->responses[MyProcNumber].value;
}
@@ -1249,7 +984,7 @@ pg_init_libpagestore(void)
"number of connections to each shard",
NULL,
&parallel_connections,
1, 1, 16,
10, 1, 100,
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
@@ -1317,6 +1052,7 @@ communicator_write_loop(void* arg)
while (true)
{
NeonCommunicatorRequest* req;
uint64 read_end_pos;
/* Number of shards is decreased so this worker is not needed any more */
if (chan_no >= shard_map.n_shards * parallel_connections)
@@ -1324,24 +1060,30 @@ communicator_write_loop(void* arg)
neon_shard_log_cs(shard_no, LOG, "Shard %d is not online any more (n_shards=%d)", (int)shard_no, (int)shard_map.n_shards);
return NULL;
}
while (true)
read_end_pos = pg_atomic_read_u64(&chan->read_pos);
Assert(read_start_pos <= read_end_pos);
if (read_start_pos == read_end_pos) /* fast path */
{
uint64 read_end_pos = pg_atomic_read_u64(&chan->read_pos);
Assert(read_start_pos <= read_end_pos);
if (read_start_pos < read_end_pos)
{
break;
}
pthread_mutex_lock(&chan->mutex);
pthread_cond_wait(&chan->cond, &chan->mutex);
while (!ShutdownRequestPending)
{
read_end_pos = pg_atomic_read_u64(&chan->read_pos);
Assert(read_start_pos <= read_end_pos);
if (read_start_pos < read_end_pos)
{
pthread_mutex_unlock(&chan->mutex);
break;
}
pthread_cond_wait(&chan->cond, &chan->mutex);
}
pthread_mutex_unlock(&chan->mutex);
if (ShutdownRequestPending)
return NULL;
}
req = &chan->requests[read_start_pos++ % ring_size];
Assert(req->hdr.u.reqid != 0);
nm_pack_request(&s, &req->hdr);
Assert(s.maxlen == MAX_REQUEST_SIZE); /* string buffer was not reallocated */
elog(LOG, "Send request %d from %d to PS", req->hdr.tag, req->hdr.u.recepient.procno);
req->hdr.u.reqid = 0; /* mark requests as processed */
pageserver_send(chan_no, &s);
}
@@ -1372,7 +1114,6 @@ communicator_read_loop(void* arg)
pg_usleep(RECEIVER_RETRY_DELAY_USEC);
continue;
}
elog(LOG, "Receive response %d from PS to %d channel %d", resp->tag, resp->u.recepient.procno, (int)chan_no);
notify_backend = true;
switch (resp->tag)
{
@@ -1428,11 +1169,11 @@ communicator_read_loop(void* arg)
}
if (notify_backend)
{
communicator->responses[resp->u.recepient.procno].value = value;
communicator->responses[resp->u.recepient.procno-1].value = value;
/* enforce write barrier before writing response code which is used as received response indicator */
pg_write_barrier();
communicator->responses[resp->u.recepient.procno].tag = resp->tag;
SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno].procLatch);
communicator->responses[resp->u.recepient.procno-1].tag = resp->tag;
SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno-1].procLatch);
}
free(resp);
}

View File

@@ -122,7 +122,10 @@ static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum);
void
nm_pack_request(StringInfo s, NeonRequest *msg)
{
int msg_len;
resetStringInfo(s);
pq_sendbyte(s, 'd'); /* copy data */
pq_sendint32(s, 0); /* message length - will be filled later */
pq_sendbyte(s, msg->tag);
pq_sendint64(s, msg->u.reqid);
pq_sendint64(s, msg->lsn);
@@ -195,6 +198,11 @@ nm_pack_request(StringInfo s, NeonRequest *msg)
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
break;
}
/* write message length */
msg_len = s->len;
s->len = 1;
pq_sendint32(s, msg_len - 1); /* exclude tag */
s->len = msg_len;
}
/*
@@ -310,11 +318,8 @@ nm_unpack_response(StringInfo s)
int n_blocks;
msg_resp = memalloc(sizeof(NeonGetSlruSegmentResponse));
if (neon_protocol_version >= 3)
{
msg_resp->req.kind = pq_getmsgbyte(s);
msg_resp->req.segno = pq_getmsgint(s, 4);
}
msg_resp->req.kind = pq_getmsgbyte(s);
msg_resp->req.segno = pq_getmsgint(s, 4);
msg_resp->req.hdr = resp_hdr;
n_blocks = pq_getmsgint(s, 4);
@@ -337,6 +342,7 @@ nm_unpack_response(StringInfo s)
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
Assert(false);
break;
}