mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
Retry attempt to connect to pageserver in order to make pageserver restart transparent for clients (#3700)
…start transparent for clients ## Describe your changes Try to reestablish connection with pageserver if send is failed to be able to make pageserver restart transparent for client ## Issue ticket number and link https://github.com/neondatabase/neon/issues/1138 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --------- Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
committed by
GitHub
parent
b80fe41af3
commit
a34e78d084
@@ -32,6 +32,9 @@
|
||||
|
||||
#define PageStoreTrace DEBUG5
|
||||
|
||||
#define MAX_RECONNECT_ATTEMPTS 5
|
||||
#define RECONNECT_INTERVAL_USEC 1000000
|
||||
|
||||
bool connected = false;
|
||||
PGconn *pageserver_conn = NULL;
|
||||
|
||||
@@ -52,8 +55,8 @@ int readahead_buffer_size = 128;
|
||||
|
||||
static void pageserver_flush(void);
|
||||
|
||||
static void
|
||||
pageserver_connect()
|
||||
static bool
|
||||
pageserver_connect(int elevel)
|
||||
{
|
||||
char *query;
|
||||
int ret;
|
||||
@@ -69,10 +72,11 @@ pageserver_connect()
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
|
||||
ereport(ERROR,
|
||||
ereport(elevel,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "could not establish connection to pageserver"),
|
||||
errdetail_internal("%s", msg)));
|
||||
return false;
|
||||
}
|
||||
|
||||
query = psprintf("pagestream %s %s", neon_tenant, neon_timeline);
|
||||
@@ -81,7 +85,8 @@ pageserver_connect()
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
neon_log(ERROR, "could not send pagestream command to pageserver");
|
||||
neon_log(elevel, "could not send pagestream command to pageserver");
|
||||
return false;
|
||||
}
|
||||
|
||||
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
@@ -113,8 +118,9 @@ pageserver_connect()
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
pageserver_conn_wes = NULL;
|
||||
|
||||
neon_log(ERROR, "could not complete handshake with pageserver: %s",
|
||||
neon_log(elevel, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -122,6 +128,7 @@ pageserver_connect()
|
||||
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring_raw);
|
||||
|
||||
connected = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -149,8 +156,11 @@ retry:
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
neon_log(ERROR, "could not get response from pageserver: %s",
|
||||
{
|
||||
neon_log(LOG, "could not get response from pageserver: %s",
|
||||
PQerrorMessage(pageserver_conn));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
goto retry;
|
||||
@@ -190,31 +200,62 @@ static void
|
||||
pageserver_send(NeonRequest * request)
|
||||
{
|
||||
StringInfoData req_buff;
|
||||
int n_reconnect_attempts = 0;
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
pageserver_disconnect();
|
||||
|
||||
if (!connected)
|
||||
pageserver_connect();
|
||||
|
||||
req_buff = nm_pack_request(request);
|
||||
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output and
|
||||
* TCP buffer.
|
||||
* If pageserver is stopped, the connections from compute node are broken.
|
||||
* The compute node doesn't notice that immediately, but it will cause the next request to fail, usually on the next query.
|
||||
* That causes user-visible errors if pageserver is restarted, or the tenant is moved from one pageserver to another.
|
||||
* See https://github.com/neondatabase/neon/issues/1138
|
||||
* So try to reestablish connection in case of failure.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
while (true)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
if (!connected)
|
||||
{
|
||||
if (!pageserver_connect(n_reconnect_attempts < MAX_RECONNECT_ATTEMPTS ? LOG : ERROR))
|
||||
{
|
||||
n_reconnect_attempts += 1;
|
||||
pg_usleep(RECONNECT_INTERVAL_USEC);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "failed to send page request: %s", msg);
|
||||
/*
|
||||
* Send request.
|
||||
*
|
||||
* In principle, this could block if the output buffer is full, and we
|
||||
* should use async mode and check for interrupts while waiting. In
|
||||
* practice, our requests are small enough to always fit in the output and
|
||||
* TCP buffer.
|
||||
*/
|
||||
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
if (n_reconnect_attempts < MAX_RECONNECT_ATTEMPTS)
|
||||
{
|
||||
neon_log(LOG, "failed to send page request (try to reconnect): %s", msg);
|
||||
if (n_reconnect_attempts != 0) /* do not sleep before first reconnect attempt, assuming that pageserver is already restarted */
|
||||
pg_usleep(RECONNECT_INTERVAL_USEC);
|
||||
n_reconnect_attempts += 1;
|
||||
continue;
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "failed to send page request: %s", msg);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
pfree(req_buff.data);
|
||||
|
||||
n_unflushed_requests++;
|
||||
|
||||
@@ -45,14 +45,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
# Stopping the pageserver breaks the connection from the postgres backend to
|
||||
# the page server, and causes the next query on the connection to fail. Start a new
|
||||
# postgres connection too, to avoid that error. (Ideally, the compute node would
|
||||
# handle that and retry internally, without propagating the error to the user, but
|
||||
# currently it doesn't...)
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000,)
|
||||
|
||||
@@ -70,8 +62,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
|
||||
assert tenant_status["state"] == "Loading"
|
||||
|
||||
# Try to read. This waits until the loading finishes, and then return normally.
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000,)
|
||||
|
||||
@@ -132,14 +122,6 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
|
||||
# Stopping the pageserver breaks the connection from the postgres backend to
|
||||
# the page server, and causes the next query on the connection to fail. Start a new
|
||||
# postgres connection too, to avoid that error. (Ideally, the compute node would
|
||||
# handle that and retry internally, without propagating the error to the user, but
|
||||
# currently it doesn't...)
|
||||
pg_conn = pg.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Check that all the updates are visible
|
||||
num_updates = pg.safe_psql("SELECT sum(updates) FROM foo")[0][0]
|
||||
assert num_updates == i * 100000
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
# This test spawns pgbench in a thread in the background and concurrently restarts pageserver,
|
||||
# checking how client is able to transparently restore connection to pageserver
|
||||
#
|
||||
import threading
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
|
||||
|
||||
|
||||
# Test restarting page server, while safekeeper and compute node keep
|
||||
# running.
|
||||
def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_pageserver_restarts")
|
||||
pg = env.postgres.create_start("test_pageserver_restarts")
|
||||
n_restarts = 10
|
||||
scale = 10
|
||||
|
||||
def run_pgbench(pg: Postgres):
|
||||
connstr = pg.connstr()
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr])
|
||||
|
||||
thread = threading.Thread(target=run_pgbench, args=(pg,), daemon=True)
|
||||
thread.start()
|
||||
|
||||
for i in range(n_restarts):
|
||||
# Stop the pageserver gracefully and restart it.
|
||||
time.sleep(1)
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
thread.join()
|
||||
Reference in New Issue
Block a user