diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 3a7c0f1bb6..894c9729f2 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -64,10 +64,26 @@ static int max_reconnect_attempts = 60; #define MAX_PAGESERVER_CONNSTRING_SIZE 256 +/* + * The "neon.pageserver_connstring" GUC is marked with the PGC_SIGHUP option, + * allowing it to be changed using pg_reload_conf(). The control plane can + * update the connection string if the pageserver crashes, is relocated, or + * new shards are added. A copy of the current value of the GUC is kept in + * shared memory, updated by the postmaster, because regular backends don't + * reload the config during query execution, but we might need to re-establish + * the pageserver connection with the new connection string even in the middle + * of a query. + * + * The shared memory copy is protected by a lockless algorithm using two + * atomic counters. The counters allow a backend to quickly check if the value + * has changed since last access, and to detect and retry copying the value if + * the postmaster changes the value concurrently. (Postmaster doesn't have a + * PGPROC entry and therefore cannot use LWLocks.) + */ typedef struct { - LWLockId lock; - pg_atomic_uint64 update_counter; + pg_atomic_uint64 begin_update_counter; + pg_atomic_uint64 end_update_counter; char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; } PagestoreShmemState; @@ -84,7 +100,7 @@ static bool pageserver_flush(void); static void pageserver_disconnect(void); static bool -PagestoreShmemIsValid() +PagestoreShmemIsValid(void) { return pagestore_shared && UsedShmemSegAddr; } @@ -98,31 +114,58 @@ CheckPageserverConnstring(char **newval, void **extra, GucSource source) static void AssignPageserverConnstring(const char *newval, void *extra) { - if (!PagestoreShmemIsValid()) + /* + * Only postmaster updates the copy in shared memory. + */ + if (!PagestoreShmemIsValid() || IsUnderPostmaster) return; - LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE); + + pg_atomic_add_fetch_u64(&pagestore_shared->begin_update_counter, 1); + pg_write_barrier(); strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE); - pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1); - LWLockRelease(pagestore_shared->lock); + pg_write_barrier(); + pg_atomic_add_fetch_u64(&pagestore_shared->end_update_counter, 1); } static bool -CheckConnstringUpdated() +CheckConnstringUpdated(void) { if (!PagestoreShmemIsValid()) return false; - return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter); + return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->begin_update_counter); } static void -ReloadConnstring() +ReloadConnstring(void) { + uint64 begin_update_counter; + uint64 end_update_counter; + if (!PagestoreShmemIsValid()) return; - LWLockAcquire(pagestore_shared->lock, LW_SHARED); - strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring)); - pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter); - LWLockRelease(pagestore_shared->lock); + + /* + * Copy the current settnig from shared to local memory. Postmaster can + * update the value concurrently, in which case we would copy a garbled + * mix of the old and new values. We will detect it because the counter's + * won't match, and retry. But it's important that we don't do anything + * within the retry-loop that would depend on the string having valid + * contents. + */ + do + { + begin_update_counter = pg_atomic_read_u64(&pagestore_shared->begin_update_counter); + end_update_counter = pg_atomic_read_u64(&pagestore_shared->end_update_counter); + pg_read_barrier(); + + strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring)); + pg_read_barrier(); + } + while (begin_update_counter != end_update_counter + || begin_update_counter != pg_atomic_read_u64(&pagestore_shared->begin_update_counter) + || end_update_counter != pg_atomic_read_u64(&pagestore_shared->end_update_counter)); + + pagestore_local_counter = end_update_counter; } static bool @@ -137,7 +180,7 @@ pageserver_connect(int elevel) static TimestampTz last_connect_time = 0; static uint64_t delay_us = MIN_RECONNECT_INTERVAL_USEC; TimestampTz now; - uint64_t us_since_last_connect; + uint64_t us_since_last_connect; Assert(!connected); @@ -147,7 +190,7 @@ pageserver_connect(int elevel) } now = GetCurrentTimestamp(); - us_since_last_connect = now - last_connect_time; + us_since_last_connect = now - last_connect_time; if (us_since_last_connect < delay_us) { pg_usleep(delay_us - us_since_last_connect); @@ -505,8 +548,8 @@ PagestoreShmemInit(void) &found); if (!found) { - pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock); - pg_atomic_init_u64(&pagestore_shared->update_counter, 0); + pg_atomic_init_u64(&pagestore_shared->begin_update_counter, 0); + pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0); AssignPageserverConnstring(page_server_connstring, NULL); } LWLockRelease(AddinShmemInitLock); @@ -531,7 +574,6 @@ pagestore_shmem_request(void) #endif RequestAddinShmemSpace(PagestoreShmemSize()); - RequestNamedLWLockTranche("neon_libpagestore", 1); } static void diff --git a/test_runner/regress/test_pageserver_reconnect.py b/test_runner/regress/test_pageserver_reconnect.py new file mode 100644 index 0000000000..aecfcdd262 --- /dev/null +++ b/test_runner/regress/test_pageserver_reconnect.py @@ -0,0 +1,42 @@ +import threading +import time +from contextlib import closing + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, PgBin + + +# Test updating neon.pageserver_connstring setting on the fly. +# +# This merely changes some whitespace in the connection string, so +# this doesn't prove that the new string actually takes effect. But at +# least the code gets exercised. +def test_pageserver_reconnect(neon_simple_env: NeonEnv, pg_bin: PgBin): + env = neon_simple_env + env.neon_cli.create_branch("test_pageserver_restarts") + endpoint = env.endpoints.create_start("test_pageserver_restarts") + n_reconnects = 1000 + timeout = 0.01 + scale = 10 + + def run_pgbench(connstr: str): + log.info(f"Start a pgbench workload on pg {connstr}") + pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) + pg_bin.run_capture(["pgbench", f"-T{int(n_reconnects*timeout)}", connstr]) + + thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True) + thread.start() + + with closing(endpoint.connect()) as con: + with con.cursor() as c: + c.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'") + connstring = c.fetchall()[0][0] + for i in range(n_reconnects): + time.sleep(timeout) + c.execute( + "alter system set neon.pageserver_connstring=%s", + (connstring + (" " * (i % 2)),), + ) + c.execute("select pg_reload_conf()") + + thread.join()