From bc07358034d0b1d4a21d055b3be350cdd2fe70b3 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 19 Feb 2025 16:59:07 +0200 Subject: [PATCH] First working version --- pgxn/neon/libpagestore.c | 671 ++++++++++++------------------------- pgxn/neon/pagestore_smgr.c | 16 +- 2 files changed, 217 insertions(+), 470 deletions(-) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 5ff53a2327..3426ac722a 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -13,6 +13,7 @@ *------------------------------------------------------------------------- */ #include +#include #include "postgres.h" @@ -65,7 +66,7 @@ char *page_server_connstring; char *neon_auth_token; int max_prefetch_distance = 128; -int parallel_connections = 1; +int parallel_connections = 10; int neon_protocol_version = 3; @@ -111,10 +112,8 @@ int MyProcNumber; typedef enum PSConnectionState { PS_Disconnected, /* no connection yet */ - PS_Connecting_Startup, /* connection starting up */ - PS_Connecting_PageStream, /* negotiating pagestream */ PS_Connected, /* connected, pagestream established */ - PS_Expired, /* connection should be reestablished */ + PS_PendingDisconnect, /* connection should be reestablished */ } PSConnectionState; /* This backend's per-shard connections */ @@ -138,14 +137,6 @@ typedef struct /* request / response counters for debugging */ uint64 nrequests_sent; uint64 nresponses_received; - - /*--- - * WaitEventSet containing: - * - WL_SOCKET_READABLE on 'conn' - * - WL_LATCH_SET on MyLatch, and - * - WL_EXIT_ON_PM_DEATH. - */ - WaitEventSet *wes_read; } PageServer; static PageServer* page_servers; @@ -174,7 +165,7 @@ log_error_message(NeonErrorResponse* err) { int save_pid = MyProcPid; pthread_mutex_lock(&mutex); - MyProcPid = ProcGlobal->allProcs[err->req.u.recepient.procno].pid; + MyProcPid = ProcGlobal->allProcs[err->req.u.recepient.procno-1].pid; neon_log(LOG, "Server returns error for request %d: %s", err->req.tag, err->message); MyProcPid = save_pid; pthread_mutex_unlock(&mutex); @@ -282,7 +273,7 @@ AssignPageserverConnstring(const char *newval, void *extra) if (page_servers[i].state == PS_Connected) { /* TODO: race condition */ - page_servers[i].state = PS_Expired; + page_servers[i].state = PS_PendingDisconnect; } } @@ -293,6 +284,8 @@ AssignPageserverConnstring(const char *newval, void *extra) void* chan_no = (void*)i; pthread_create(&writer, NULL, communicator_write_loop, chan_no); pthread_create(&reader, NULL, communicator_read_loop, chan_no); + pthread_detach(reader); + pthread_detach(writer); } } @@ -318,13 +311,6 @@ get_shard_number(NRelFileInfo rinfo, BlockNumber blocknum) static void cleanup_and_disconnect(PageServer *ps) { - pthread_mutex_lock(&mutex); - - if (ps->wes_read) - { - FreeWaitEventSet(ps->wes_read); - ps->wes_read = NULL; - } if (ps->conn) { MyNeonCounters->pageserver_disconnects_total++; @@ -333,8 +319,6 @@ cleanup_and_disconnect(PageServer *ps) } ps->state = PS_Disconnected; - - pthread_mutex_unlock(&mutex); } /* @@ -351,6 +335,90 @@ chomp(char const* in) return strndup(in, n); } + +/* + * TODO: we have to defined this pq_send/pq_poll function to perform raw write to non-blocking socket. + * Is there and bette/portable/simpler way in :ostgres to do it? + */ +#if PG_VERSION_NUM < 170000 +static int +PQsocketPoll(int sock, int forRead, int forWrite, time_t end_time) +{ + struct pollfd input_fd; + int timeout_ms; + + if (!forRead && !forWrite) + return 0; + + input_fd.fd = sock; + input_fd.events = POLLERR; + input_fd.revents = 0; + + if (forRead) + input_fd.events |= POLLIN; + if (forWrite) + input_fd.events |= POLLOUT; + + /* Compute appropriate timeout interval */ + if (end_time == ((time_t) -1)) + timeout_ms = -1; + else + { + time_t now = time(NULL); + + if (end_time > now) + timeout_ms = (end_time - now) * 1000; + else + timeout_ms = 0; + } + + return poll(&input_fd, 1, timeout_ms); +} +#endif + +static int +pq_poll(PGconn *conn, int forRead, int forWrite, time_t end_time) +{ + int result; + do + { + result = PQsocketPoll(PQsocket(conn), forRead, forWrite, end_time); + } + while (result < 0 && errno == EINTR); + return result; +} + +static ssize_t +pq_send(PGconn *conn, const void *ptr, size_t len) +{ + size_t offs = 0; + while (offs < len) + { + ssize_t rc = send(PQsocket(conn), (char*)ptr + offs, len - offs, 0); + if (rc < 0) + { + /* Anything except EAGAIN/EWOULDBLOCK/EINTR is trouble */ + switch (errno) + { + case EAGAIN: + break; + case EINTR: + continue; + default: + return rc; + } + rc = pq_poll(conn, false, true, -1); + if (rc < 0) + { + return rc; + } + } else { + offs += rc; + } + } + return offs; +} + /* * Connect to a pageserver, or continue to try to connect if we're yet to * complete the connection (e.g. due to receiving an earlier cancellation @@ -367,415 +435,103 @@ pageserver_connect(int chan_no, int elevel) char pagestream_query[MAX_PS_QUERY_LENGTH]; int shard_no = CHAN_TO_SHARD(chan_no); char* connstr = shard_map.connstring[shard_no]; + const char *keywords[3]; + const char *values[3]; + int n_pgsql_params; + TimestampTz now; + int64 us_since_last_attempt; + PGresult* res; - switch (ps->state) - { - case PS_Disconnected: - { - const char *keywords[3]; - const char *values[3]; - int n_pgsql_params; - TimestampTz now; - int64 us_since_last_attempt; + /* Make sure we start with a clean slate */ + cleanup_and_disconnect(ps); - /* Make sure we start with a clean slate */ - cleanup_and_disconnect(ps); + neon_shard_log_cs(shard_no, DEBUG5, "Connection state: Disconnected"); - neon_shard_log_cs(shard_no, DEBUG5, "Connection state: Disconnected"); + now = GetCurrentTimestamp(); + us_since_last_attempt = (int64) (now - ps->last_reconnect_time); + ps->last_reconnect_time = now; - now = GetCurrentTimestamp(); - us_since_last_attempt = (int64) (now - ps->last_reconnect_time); - ps->last_reconnect_time = now; - - /* - * Make sure we don't do exponential backoff with a constant multiplier - * of 0 us, as that doesn't really do much for timeouts... - * - * cf. https://github.com/neondatabase/neon/issues/7897 - */ - if (ps->delay_us == 0) - ps->delay_us = MIN_RECONNECT_INTERVAL_USEC; - - /* - * If we did other tasks between reconnect attempts, then we won't - * need to wait as long as a full delay. - */ - if (us_since_last_attempt < ps->delay_us) - { - pg_usleep(ps->delay_us - us_since_last_attempt); - } - - /* update the delay metric */ - ps->delay_us = Min(ps->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC); - - /* - * Connect using the connection string we got from the - * neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment - * variable was set, use that as the password. - * - * The connection options are parsed in the order they're given, so when - * we set the password before the connection string, the connection string - * can override the password from the env variable. Seems useful, although - * we don't currently use that capability anywhere. - */ - keywords[0] = "dbname"; - values[0] = connstr; - n_pgsql_params = 1; - - if (neon_auth_token) - { - keywords[1] = "password"; - values[1] = neon_auth_token; - n_pgsql_params++; - } - - keywords[n_pgsql_params] = NULL; - values[n_pgsql_params] = NULL; - - ps->conn = PQconnectStartParams(keywords, values, 1); - if (PQstatus(ps->conn) == CONNECTION_BAD) - { - char *msg = chomp(PQerrorMessage(ps->conn)); - cleanup_and_disconnect(ps); - ereport(elevel, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no), - errdetail_internal("%s", msg))); - free(msg); - return false; - } - ps->state = PS_Connecting_Startup; - } - /* FALLTHROUGH */ - case PS_Connecting_Startup: - { - int ps_send_query_ret; - bool connected = false; - int poll_result = PGRES_POLLING_WRITING; - neon_shard_log_cs(shard_no, DEBUG5, "Connection state: Connecting_Startup"); - - do - { - switch (poll_result) - { - default: /* unknown/unused states are handled as a failed connection */ - case PGRES_POLLING_FAILED: - { - char *pqerr = PQerrorMessage(ps->conn); - char *msg = NULL; - neon_shard_log_cs(shard_no, DEBUG5, "POLLING_FAILED"); - - if (pqerr) - msg = chomp(pqerr); - - cleanup_and_disconnect(ps); - - if (msg) - { - neon_shard_log_cs(shard_no, elevel, - "could not connect to pageserver: %s", - msg); - free(msg); - } - else - neon_shard_log_cs(shard_no, elevel, - "could not connect to pageserver"); - - return false; - } - case PGRES_POLLING_READING: - /* Sleep until there's something to do */ - while (true) - { - int rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE, - PQsocket(ps->conn), - 0, - WAIT_EVENT_NEON_PS_STARTING); - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - /* query cancellation, backend shutdown */ - CHECK_FOR_INTERRUPTS(); - } - if (rc & WL_SOCKET_READABLE) - break; - } - /* PQconnectPoll() handles the socket polling state updates */ - - break; - case PGRES_POLLING_WRITING: - /* Sleep until there's something to do */ - while (true) - { - int rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE, - PQsocket(ps->conn), - 0, - WAIT_EVENT_NEON_PS_STARTING); - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - /* query cancellation, backend shutdown */ - CHECK_FOR_INTERRUPTS(); - } - if (rc & WL_SOCKET_WRITEABLE) - break; - } - /* PQconnectPoll() handles the socket polling state updates */ - - break; - case PGRES_POLLING_OK: - neon_shard_log_cs(shard_no, DEBUG5, "POLLING_OK"); - connected = true; - break; - } - poll_result = PQconnectPoll(ps->conn); - } - while (!connected); - - /* No more polling needed; connection succeeded */ - ps->last_connect_time = GetCurrentTimestamp(); - - /* Allocate wait event set in critical section */ - pthread_mutex_lock(&mutex); -#if PG_MAJORVERSION_NUM >= 17 - ps->wes_read = CreateWaitEventSet(NULL, 3); -#else - ps->wes_read = CreateWaitEventSet(TopMemoryContext, 3); -#endif - AddWaitEventToSet(ps->wes_read, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - AddWaitEventToSet(ps->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - AddWaitEventToSet(ps->wes_read, WL_SOCKET_READABLE, PQsocket(ps->conn), NULL, NULL); - pthread_mutex_unlock(&mutex); - - - switch (neon_protocol_version) - { - case 3: - snprintf(pagestream_query, sizeof pagestream_query, "pagestream_v3 %s %s", neon_tenant, neon_timeline); - break; - case 2: - snprintf(pagestream_query, sizeof pagestream_query, "pagestream_v2 %s %s", neon_tenant, neon_timeline); - break; - default: - neon_shard_log_cs(shard_no, ERROR, "unexpected neon_protocol_version %d", neon_protocol_version); - } - - if (PQstatus(ps->conn) == CONNECTION_BAD) - { - char *msg = chomp(PQerrorMessage(ps->conn)); - - cleanup_and_disconnect(ps); - - ereport(elevel, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no), - errdetail_internal("%s", msg))); - free(msg); - return false; - } - - ps_send_query_ret = PQsendQuery(ps->conn, pagestream_query); - if (ps_send_query_ret != 1) - { - cleanup_and_disconnect(ps); - - neon_shard_log_cs(shard_no, elevel, "could not send pagestream command to pageserver"); - return false; - } - - ps->state = PS_Connecting_PageStream; - } - /* FALLTHROUGH */ - case PS_Connecting_PageStream: - { - neon_shard_log_cs(shard_no, DEBUG5, "Connection state: Connecting_PageStream"); - - if (PQstatus(ps->conn) == CONNECTION_BAD) - { - char *msg = chomp(PQerrorMessage(ps->conn)); - cleanup_and_disconnect(ps); - ereport(elevel, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no), - errdetail_internal("%s", msg))); - free(msg); - return false; - } - - while (PQisBusy(ps->conn)) - { - WaitEvent event; - - /* Sleep until there's something to do */ - (void) WaitEventSetWait(ps->wes_read, -1L, &event, 1, - WAIT_EVENT_NEON_PS_CONFIGURING); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (event.events & WL_SOCKET_READABLE) - { - if (!PQconsumeInput(ps->conn)) - { - char *msg = chomp(PQerrorMessage(ps->conn)); - - cleanup_and_disconnect(ps); - neon_shard_log_cs(shard_no, elevel, "could not complete handshake with pageserver: %s", - msg); - free(msg); - return false; - } - } - } - - ps->state = PS_Connected; - ps->nrequests_sent = 0; - ps->nresponses_received = 0; - } - /* FALLTHROUGH */ - case PS_Connected: - /* - * We successfully connected. Future connections to this PageServer - * will do fast retries again, with exponential backoff. - */ + /* + * Make sure we don't do exponential backoff with a constant multiplier + * of 0 us, as that doesn't really do much for timeouts... + * + * cf. https://github.com/neondatabase/neon/issues/7897 + */ + if (ps->delay_us == 0) ps->delay_us = MIN_RECONNECT_INTERVAL_USEC; - neon_shard_log_cs(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version); - return true; - default: - neon_shard_log_cs(shard_no, ERROR, "libpagestore: invalid connection state %d", ps->state); + /* + * If we did other tasks between reconnect attempts, then we won't + * need to wait as long as a full delay. + */ + if (us_since_last_attempt < ps->delay_us) + { + pg_usleep(ps->delay_us - us_since_last_attempt); } - /* This shouldn't be hit */ - Assert(false); -} -/* - * A wrapper around PQgetCopyData that checks for interrupts while sleeping. - */ -static int -call_PQgetCopyData(int chan_no, char **buffer) -{ - int ret; - PageServer *ps = &page_servers[chan_no]; - PGconn *pageserver_conn = ps->conn; - instr_time now, - start_ts, - since_start, - last_log_ts, - since_last_log; - bool logged = false; - int shard_no = CHAN_TO_SHARD(chan_no); + /* update the delay metric */ + ps->delay_us = Min(ps->delay_us * 2, MAX_RECONNECT_INTERVAL_USEC); /* - * As a debugging aid, if we don't get a response for a long time, print a - * log message. + * Connect using the connection string we got from the + * neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment + * variable was set, use that as the password. * - * 10 s is a very generous threshold, normally we expect a response in a - * few milliseconds. We have metrics to track latencies in normal ranges, - * but in the cases that take exceptionally long, it's useful to log the - * exact timestamps. + * The connection options are parsed in the order they're given, so when + * we set the password before the connection string, the connection string + * can override the password from the env variable. Seems useful, although + * we don't currently use that capability anywhere. */ -#define LOG_INTERVAL_MS INT64CONST(10 * 1000) + keywords[0] = "dbname"; + values[0] = connstr; + n_pgsql_params = 1; - INSTR_TIME_SET_CURRENT(now); - start_ts = last_log_ts = now; - INSTR_TIME_SET_ZERO(since_last_log); - -retry: - ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ ); - - if (ret == 0) + if (neon_auth_token) { - WaitEvent event; - long timeout; - - timeout = Max(0, LOG_INTERVAL_MS - INSTR_TIME_GET_MILLISEC(since_last_log)); - - /* Sleep until there's something to do */ - (void) WaitEventSetWait(ps->wes_read, timeout, &event, 1, - WAIT_EVENT_NEON_PS_READ); - ResetLatch(MyLatch); - - CHECK_FOR_INTERRUPTS(); - - /* Data available in socket? */ - if (event.events & WL_SOCKET_READABLE) - { - if (!PQconsumeInput(pageserver_conn)) - { - char *msg = chomp(PQerrorMessage(pageserver_conn)); - - neon_shard_log_cs(shard_no, LOG, "could not get response from pageserver: %s", msg); - free(msg); - return -1; - } - } - - /* - * Print a message to the log if a long time has passed with no - * response. - */ - INSTR_TIME_SET_CURRENT(now); - since_last_log = now; - INSTR_TIME_SUBTRACT(since_last_log, last_log_ts); - if (INSTR_TIME_GET_MILLISEC(since_last_log) >= LOG_INTERVAL_MS) - { - int sndbuf = -1; - int recvbuf = -1; -#ifdef __linux__ - int socketfd; -#endif - - since_start = now; - INSTR_TIME_SUBTRACT(since_start, start_ts); - -#ifdef __linux__ - /* - * get kernel's send and recv queue size via ioctl - * https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27 - */ - socketfd = PQsocket(pageserver_conn); - if (socketfd != -1) { - int ioctl_err; - ioctl_err = ioctl(socketfd, SIOCOUTQ, &sndbuf); - if (ioctl_err!= 0) { - sndbuf = -errno; - } - ioctl_err = ioctl(socketfd, FIONREAD, &recvbuf); - if (ioctl_err != 0) { - recvbuf = -errno; - } - } -#endif - neon_shard_log_cs(shard_no, LOG, "no response received from pageserver for %0.3f s, still waiting (sent " UINT64_FORMAT " requests, received " UINT64_FORMAT " responses) (socket sndbuf=%d recvbuf=%d)", - INSTR_TIME_GET_DOUBLE(since_start), - ps->nrequests_sent, ps->nresponses_received, sndbuf, recvbuf); - last_log_ts = now; - logged = true; - } - - goto retry; + keywords[1] = "password"; + values[1] = neon_auth_token; + n_pgsql_params++; } - /* - * If we logged earlier that the response is taking a long time, log - * another message when the response is finally received. - */ - if (logged) - { - INSTR_TIME_SET_CURRENT(now); - since_start = now; - INSTR_TIME_SUBTRACT(since_start, start_ts); - neon_shard_log_cs(shard_no, LOG, "received response from pageserver after %0.3f s", - INSTR_TIME_GET_DOUBLE(since_start)); - } + keywords[n_pgsql_params] = NULL; + values[n_pgsql_params] = NULL; - return ret; + ps->conn = PQconnectdbParams(keywords, values, 1); + if (PQstatus(ps->conn) == CONNECTION_BAD) + { + char *msg = chomp(PQerrorMessage(ps->conn)); + cleanup_and_disconnect(ps); + ereport(elevel, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg(NEON_TAG "[shard %d] could not establish connection to pageserver", chan_no), + errdetail_internal("%s", msg))); + free(msg); + return false; + } + ps->last_connect_time = GetCurrentTimestamp(); + snprintf(pagestream_query, sizeof pagestream_query, "pagestream_v%d %s %s", neon_protocol_version, neon_tenant, neon_timeline); + + res = PQexec(ps->conn, pagestream_query); + if (PQresultStatus(res) != PGRES_COPY_BOTH) + { + char *msg = chomp(PQerrorMessage(ps->conn)); + cleanup_and_disconnect(ps); + ereport(elevel, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg(NEON_TAG "[shard %d] could perform handshake with pageserver", chan_no), + errdetail_internal("%s", msg))); + free(msg); + return false; + } + PQclear(res); + + ps->state = PS_Connected; + ps->nrequests_sent = 0; + ps->nresponses_received = 0; + ps->delay_us = MIN_RECONNECT_INTERVAL_USEC; + + neon_shard_log_cs(shard_no, LOG, "libpagestore: connected to '%s' with protocol version %d", connstr, neon_protocol_version); + return true; } /* @@ -827,7 +583,7 @@ pageserver_send(int chan_no, StringInfo msg) */ if (ps->state != PS_Connected) { - if (ps->state == PS_Expired) + if (ps->state == PS_PendingDisconnect) { neon_shard_log_cs(shard_no, LOG, "pageserver_send disconnect expired connection"); pageserver_disconnect(chan_no); @@ -847,37 +603,17 @@ pageserver_send(int chan_no, StringInfo msg) /* * Send request. * - * In principle, this could block if the output buffer is full, and we - * should use async mode and check for interrupts while waiting. In - * practice, our requests are small enough to always fit in the output and - * TCP buffer. - * - * Note that this also will fail when the connection is in the - * PGRES_POLLING_WRITING state. It's kinda dirty to disconnect at this - * point, but on the grand scheme of things it's only a small issue. + * We can not use PQputCopyData because it tries to read input messages. + * So we have to do it "manually". */ - ps->nrequests_sent++; - if (PQputCopyData(pageserver_conn, msg->data, msg->len) <= 0) + if (pq_send(pageserver_conn, msg->data, msg->len) < 0) { char *msg = chomp(PQerrorMessage(pageserver_conn)); - pageserver_disconnect(chan_no); neon_shard_log_cs(shard_no, LOG, "pageserver_send disconnected: failed to send page request (try to reconnect): %s", msg); free(msg); return false; } - /* - * TODO: may be not flush each request? - */ - if (PQflush(pageserver_conn)) - { - char *msg = chomp(PQerrorMessage(pageserver_conn)); - - pageserver_disconnect(chan_no); - neon_shard_log_cs(shard_no, LOG, "pageserver_flush disconnect because failed to flush page requests: %s", msg); - free(msg); - return false; - } return true; } @@ -890,7 +626,7 @@ static NeonResponse * pageserver_receive(int chan_no) { StringInfoData resp_buff; - NeonResponse *resp; + NeonResponse *resp = NULL; PageServer *ps = &page_servers[chan_no]; PGconn *pageserver_conn = ps->conn; int shard_no = CHAN_TO_SHARD(chan_no); @@ -908,7 +644,8 @@ pageserver_receive(int chan_no) Assert(pageserver_conn); - rc = call_PQgetCopyData(chan_no, &resp_buff.data); + resp_buff.data = NULL; + rc = PQgetCopyData(pageserver_conn, &resp_buff.data, false); if (rc >= 0) { /* call_PQgetCopyData handles rc == 0 */ @@ -927,20 +664,18 @@ pageserver_receive(int chan_no) else if (rc == -1) { neon_shard_log_cs(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", chomp(PQerrorMessage(pageserver_conn))); - pageserver_disconnect(chan_no); - resp = NULL; + ps->state = PS_PendingDisconnect; } else if (rc == -2) { char *msg = chomp(PQerrorMessage(pageserver_conn)); - - pageserver_disconnect(chan_no); - neon_shard_log_cs(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg); + ps->state = PS_PendingDisconnect; + neon_shard_log_cs(shard_no, LOG, "pageserver_receive disconnect: could not read COPY data: %s", msg); } else { - pageserver_disconnect(chan_no); - neon_shard_log_cs(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); + ps->state = PS_PendingDisconnect; + neon_shard_log_cs(shard_no, LOG, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); } ps->nresponses_received++; @@ -1081,7 +816,8 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req) /* ring overflow should not happen */ Assert(chan->requests[ring_pos].hdr.u.reqid == 0); - req->hdr.u.recepient.procno = MyProcNumber; + req->hdr.u.recepient.procno = MyProcNumber + 1; /* make it non-zero */ + Assert(req->hdr.u.reqid != 0); /* copy request */ chan->requests[ring_pos] = *req; @@ -1089,7 +825,7 @@ communicator_send_request(int shard, NeonCommunicatorRequest* req) /* will be overwritten with response code when request will be processed */ communicator->responses[MyProcNumber].tag = req->hdr.tag; - /* enforce memory battier before pinging communicator */ + /* enforce memory barrier before pinging communicator */ pg_write_barrier(); /* advance read-up-to position */ @@ -1120,7 +856,6 @@ communicator_receive_response(void) { elog(ERROR, "Request failed"); /* detailed error message is printed by communicator */ } - elog(LOG, "Backend %d receive response %d from communicator", MyProcNumber, communicator->responses[MyProcNumber].tag); return communicator->responses[MyProcNumber].value; } @@ -1249,7 +984,7 @@ pg_init_libpagestore(void) "number of connections to each shard", NULL, ¶llel_connections, - 1, 1, 16, + 10, 1, 100, PGC_POSTMASTER, 0, /* no flags required */ NULL, NULL, NULL); @@ -1317,6 +1052,7 @@ communicator_write_loop(void* arg) while (true) { NeonCommunicatorRequest* req; + uint64 read_end_pos; /* Number of shards is decreased so this worker is not needed any more */ if (chan_no >= shard_map.n_shards * parallel_connections) @@ -1324,24 +1060,30 @@ communicator_write_loop(void* arg) neon_shard_log_cs(shard_no, LOG, "Shard %d is not online any more (n_shards=%d)", (int)shard_no, (int)shard_map.n_shards); return NULL; } - while (true) + read_end_pos = pg_atomic_read_u64(&chan->read_pos); + Assert(read_start_pos <= read_end_pos); + if (read_start_pos == read_end_pos) /* fast path */ { - uint64 read_end_pos = pg_atomic_read_u64(&chan->read_pos); - Assert(read_start_pos <= read_end_pos); - if (read_start_pos < read_end_pos) - { - break; - } pthread_mutex_lock(&chan->mutex); - pthread_cond_wait(&chan->cond, &chan->mutex); + while (!ShutdownRequestPending) + { + read_end_pos = pg_atomic_read_u64(&chan->read_pos); + Assert(read_start_pos <= read_end_pos); + if (read_start_pos < read_end_pos) + { + pthread_mutex_unlock(&chan->mutex); + break; + } + pthread_cond_wait(&chan->cond, &chan->mutex); + } pthread_mutex_unlock(&chan->mutex); if (ShutdownRequestPending) return NULL; } req = &chan->requests[read_start_pos++ % ring_size]; + Assert(req->hdr.u.reqid != 0); nm_pack_request(&s, &req->hdr); Assert(s.maxlen == MAX_REQUEST_SIZE); /* string buffer was not reallocated */ - elog(LOG, "Send request %d from %d to PS", req->hdr.tag, req->hdr.u.recepient.procno); req->hdr.u.reqid = 0; /* mark requests as processed */ pageserver_send(chan_no, &s); } @@ -1372,7 +1114,6 @@ communicator_read_loop(void* arg) pg_usleep(RECEIVER_RETRY_DELAY_USEC); continue; } - elog(LOG, "Receive response %d from PS to %d channel %d", resp->tag, resp->u.recepient.procno, (int)chan_no); notify_backend = true; switch (resp->tag) { @@ -1428,11 +1169,11 @@ communicator_read_loop(void* arg) } if (notify_backend) { - communicator->responses[resp->u.recepient.procno].value = value; + communicator->responses[resp->u.recepient.procno-1].value = value; /* enforce write barrier before writing response code which is used as received response indicator */ pg_write_barrier(); - communicator->responses[resp->u.recepient.procno].tag = resp->tag; - SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno].procLatch); + communicator->responses[resp->u.recepient.procno-1].tag = resp->tag; + SetLatch(&ProcGlobal->allProcs[resp->u.recepient.procno-1].procLatch); } free(resp); } diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index c5b282af4d..9e7df68d9f 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -122,7 +122,10 @@ static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum); void nm_pack_request(StringInfo s, NeonRequest *msg) { + int msg_len; resetStringInfo(s); + pq_sendbyte(s, 'd'); /* copy data */ + pq_sendint32(s, 0); /* message length - will be filled later */ pq_sendbyte(s, msg->tag); pq_sendint64(s, msg->u.reqid); pq_sendint64(s, msg->lsn); @@ -195,6 +198,11 @@ nm_pack_request(StringInfo s, NeonRequest *msg) neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag); break; } + /* write message length */ + msg_len = s->len; + s->len = 1; + pq_sendint32(s, msg_len - 1); /* exclude tag */ + s->len = msg_len; } /* @@ -310,11 +318,8 @@ nm_unpack_response(StringInfo s) int n_blocks; msg_resp = memalloc(sizeof(NeonGetSlruSegmentResponse)); - if (neon_protocol_version >= 3) - { - msg_resp->req.kind = pq_getmsgbyte(s); - msg_resp->req.segno = pq_getmsgint(s, 4); - } + msg_resp->req.kind = pq_getmsgbyte(s); + msg_resp->req.segno = pq_getmsgint(s, 4); msg_resp->req.hdr = resp_hdr; n_blocks = pq_getmsgint(s, 4); @@ -337,6 +342,7 @@ nm_unpack_response(StringInfo s) case T_NeonDbSizeRequest: case T_NeonGetSlruSegmentRequest: default: + Assert(false); break; }