From a34e78d0841d68b2f3eddfd4065e8b5f24052919 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 9 Mar 2023 22:15:46 +0200 Subject: [PATCH] Retry attempt to connect to pageserver in order to make pageserver restart transparent for clients (#3700) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …start transparent for clients ## Describe your changes Try to reestablish connection with pageserver if send is failed to be able to make pageserver restart transparent for client ## Issue ticket number and link https://github.com/neondatabase/neon/issues/1138 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --------- Co-authored-by: Heikki Linnakangas --- pgxn/neon/libpagestore.c | 77 ++++++++++++++----- .../regress/test_pageserver_restart.py | 18 ----- ...test_pageserver_restarts_under_workload.py | 35 +++++++++ 3 files changed, 94 insertions(+), 36 deletions(-) create mode 100644 test_runner/regress/test_pageserver_restarts_under_workload.py diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 88e3a12d96..3fe6d38251 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -32,6 +32,9 @@ #define PageStoreTrace DEBUG5 +#define MAX_RECONNECT_ATTEMPTS 5 +#define RECONNECT_INTERVAL_USEC 1000000 + bool connected = false; PGconn *pageserver_conn = NULL; @@ -52,8 +55,8 @@ int readahead_buffer_size = 128; static void pageserver_flush(void); -static void -pageserver_connect() +static bool +pageserver_connect(int elevel) { char *query; int ret; @@ -69,10 +72,11 @@ pageserver_connect() PQfinish(pageserver_conn); pageserver_conn = NULL; - ereport(ERROR, + ereport(elevel, (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), errmsg(NEON_TAG "could not establish connection to pageserver"), errdetail_internal("%s", msg))); + return false; } query = psprintf("pagestream %s %s", neon_tenant, neon_timeline); @@ -81,7 +85,8 @@ pageserver_connect() { PQfinish(pageserver_conn); pageserver_conn = NULL; - neon_log(ERROR, "could not send pagestream command to pageserver"); + neon_log(elevel, "could not send pagestream command to pageserver"); + return false; } pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3); @@ -113,8 +118,9 @@ pageserver_connect() FreeWaitEventSet(pageserver_conn_wes); pageserver_conn_wes = NULL; - neon_log(ERROR, "could not complete handshake with pageserver: %s", + neon_log(elevel, "could not complete handshake with pageserver: %s", msg); + return false; } } } @@ -122,6 +128,7 @@ pageserver_connect() neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring_raw); connected = true; + return true; } /* @@ -149,8 +156,11 @@ retry: if (event.events & WL_SOCKET_READABLE) { if (!PQconsumeInput(pageserver_conn)) - neon_log(ERROR, "could not get response from pageserver: %s", + { + neon_log(LOG, "could not get response from pageserver: %s", PQerrorMessage(pageserver_conn)); + return -1; + } } goto retry; @@ -190,31 +200,62 @@ static void pageserver_send(NeonRequest * request) { StringInfoData req_buff; + int n_reconnect_attempts = 0; /* If the connection was lost for some reason, reconnect */ if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) pageserver_disconnect(); - if (!connected) - pageserver_connect(); req_buff = nm_pack_request(request); /* - * Send request. - * - * In principle, this could block if the output buffer is full, and we - * should use async mode and check for interrupts while waiting. In - * practice, our requests are small enough to always fit in the output and - * TCP buffer. + * If pageserver is stopped, the connections from compute node are broken. + * The compute node doesn't notice that immediately, but it will cause the next request to fail, usually on the next query. + * That causes user-visible errors if pageserver is restarted, or the tenant is moved from one pageserver to another. + * See https://github.com/neondatabase/neon/issues/1138 + * So try to reestablish connection in case of failure. */ - if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) + while (true) { - char *msg = pchomp(PQerrorMessage(pageserver_conn)); + if (!connected) + { + if (!pageserver_connect(n_reconnect_attempts < MAX_RECONNECT_ATTEMPTS ? LOG : ERROR)) + { + n_reconnect_attempts += 1; + pg_usleep(RECONNECT_INTERVAL_USEC); + continue; + } + } - pageserver_disconnect(); - neon_log(ERROR, "failed to send page request: %s", msg); + /* + * Send request. + * + * In principle, this could block if the output buffer is full, and we + * should use async mode and check for interrupts while waiting. In + * practice, our requests are small enough to always fit in the output and + * TCP buffer. + */ + if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0) + { + char *msg = pchomp(PQerrorMessage(pageserver_conn)); + if (n_reconnect_attempts < MAX_RECONNECT_ATTEMPTS) + { + neon_log(LOG, "failed to send page request (try to reconnect): %s", msg); + if (n_reconnect_attempts != 0) /* do not sleep before first reconnect attempt, assuming that pageserver is already restarted */ + pg_usleep(RECONNECT_INTERVAL_USEC); + n_reconnect_attempts += 1; + continue; + } + else + { + pageserver_disconnect(); + neon_log(ERROR, "failed to send page request: %s", msg); + } + } + break; } + pfree(req_buff.data); n_unflushed_requests++; diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 6388e979e5..453ddec0d4 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -45,14 +45,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): env.pageserver.stop() env.pageserver.start() - # Stopping the pageserver breaks the connection from the postgres backend to - # the page server, and causes the next query on the connection to fail. Start a new - # postgres connection too, to avoid that error. (Ideally, the compute node would - # handle that and retry internally, without propagating the error to the user, but - # currently it doesn't...) - pg_conn = pg.connect() - cur = pg_conn.cursor() - cur.execute("SELECT count(*) FROM foo") assert cur.fetchone() == (100000,) @@ -70,8 +62,6 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): assert tenant_status["state"] == "Loading" # Try to read. This waits until the loading finishes, and then return normally. - pg_conn = pg.connect() - cur = pg_conn.cursor() cur.execute("SELECT count(*) FROM foo") assert cur.fetchone() == (100000,) @@ -132,14 +122,6 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder): env.pageserver.stop(immediate=True) env.pageserver.start() - # Stopping the pageserver breaks the connection from the postgres backend to - # the page server, and causes the next query on the connection to fail. Start a new - # postgres connection too, to avoid that error. (Ideally, the compute node would - # handle that and retry internally, without propagating the error to the user, but - # currently it doesn't...) - pg_conn = pg.connect() - cur = pg_conn.cursor() - # Check that all the updates are visible num_updates = pg.safe_psql("SELECT sum(updates) FROM foo")[0][0] assert num_updates == i * 100000 diff --git a/test_runner/regress/test_pageserver_restarts_under_workload.py b/test_runner/regress/test_pageserver_restarts_under_workload.py new file mode 100644 index 0000000000..28159778fe --- /dev/null +++ b/test_runner/regress/test_pageserver_restarts_under_workload.py @@ -0,0 +1,35 @@ +# This test spawns pgbench in a thread in the background and concurrently restarts pageserver, +# checking how client is able to transparently restore connection to pageserver +# +import threading +import time + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres + + +# Test restarting page server, while safekeeper and compute node keep +# running. +def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin): + env = neon_simple_env + env.neon_cli.create_branch("test_pageserver_restarts") + pg = env.postgres.create_start("test_pageserver_restarts") + n_restarts = 10 + scale = 10 + + def run_pgbench(pg: Postgres): + connstr = pg.connstr() + log.info(f"Start a pgbench workload on pg {connstr}") + pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) + pg_bin.run_capture(["pgbench", f"-T{n_restarts}", connstr]) + + thread = threading.Thread(target=run_pgbench, args=(pg,), daemon=True) + thread.start() + + for i in range(n_restarts): + # Stop the pageserver gracefully and restart it. + time.sleep(1) + env.pageserver.stop() + env.pageserver.start() + + thread.join()