diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9fd88e5818..373f05ab2f 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -710,8 +710,12 @@ impl ComputeNode { // `pg_ctl` for start / stop, so this just seems much easier to do as we already // have opened connection to Postgres and superuser access. #[instrument(skip_all)] - fn pg_reload_conf(&self, client: &mut Client) -> Result<()> { - client.simple_query("SELECT pg_reload_conf()")?; + fn pg_reload_conf(&self) -> Result<()> { + let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl"); + Command::new(pgctl_bin) + .args(["reload", "-D", &self.pgdata]) + .output() + .expect("cannot run pg_ctl process"); Ok(()) } @@ -724,9 +728,9 @@ impl ComputeNode { // Write new config let pgdata_path = Path::new(&self.pgdata); config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?; + self.pg_reload_conf()?; let mut client = Client::connect(self.connstr.as_str(), NoTls)?; - self.pg_reload_conf(&mut client)?; // Proceed with post-startup configuration. Note, that order of operations is important. // Disable DDL forwarding because control plane already knows about these roles/databases. diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 0a944a6bf0..cc09fb849d 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -19,7 +19,10 @@ #include "access/xlog.h" #include "access/xlogutils.h" #include "storage/buf_internals.h" +#include "storage/lwlock.h" +#include "storage/ipc.h" #include "c.h" +#include "postmaster/interrupt.h" #include "libpq-fe.h" #include "libpq/pqformat.h" @@ -61,23 +64,63 @@ int flush_every_n_requests = 8; int n_reconnect_attempts = 0; int max_reconnect_attempts = 60; +#define MAX_PAGESERVER_CONNSTRING_SIZE 256 + +typedef struct +{ + LWLockId lock; + pg_atomic_uint64 update_counter; + char pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; +} PagestoreShmemState; + +#if PG_VERSION_NUM >= 150000 +static shmem_request_hook_type prev_shmem_request_hook = NULL; +static void walproposer_shmem_request(void); +#endif +static shmem_startup_hook_type prev_shmem_startup_hook; +static PagestoreShmemState *pagestore_shared; +static uint64 pagestore_local_counter = 0; +static char local_pageserver_connstring[MAX_PAGESERVER_CONNSTRING_SIZE]; + bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; static bool pageserver_flush(void); static void pageserver_disconnect(void); - -static pqsigfunc prev_signal_handler; +static bool +CheckPageserverConnstring(char **newval, void **extra, GucSource source) +{ + return strlen(*newval) < MAX_PAGESERVER_CONNSTRING_SIZE; +} static void -pageserver_sighup_handler(SIGNAL_ARGS) +AssignPageserverConnstring(const char *newval, void *extra) { - if (prev_signal_handler) - { - prev_signal_handler(postgres_signal_arg); - } - neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring); - pageserver_disconnect(); + if(!pagestore_shared) + return; + LWLockAcquire(pagestore_shared->lock, LW_EXCLUSIVE); + strlcpy(pagestore_shared->pageserver_connstring, newval, MAX_PAGESERVER_CONNSTRING_SIZE); + pg_atomic_fetch_add_u64(&pagestore_shared->update_counter, 1); + LWLockRelease(pagestore_shared->lock); +} + +static bool +CheckConnstringUpdated() +{ + if(!pagestore_shared) + return false; + return pagestore_local_counter < pg_atomic_read_u64(&pagestore_shared->update_counter); +} + +static void +ReloadConnstring() +{ + if(!pagestore_shared) + return; + LWLockAcquire(pagestore_shared->lock, LW_SHARED); + strlcpy(local_pageserver_connstring, pagestore_shared->pageserver_connstring, sizeof(local_pageserver_connstring)); + pagestore_local_counter = pg_atomic_read_u64(&pagestore_shared->update_counter); + LWLockRelease(pagestore_shared->lock); } static bool @@ -91,6 +134,11 @@ pageserver_connect(int elevel) Assert(!connected); + if(CheckConnstringUpdated()) + { + ReloadConnstring(); + } + /* * Connect using the connection string we got from the * neon.pageserver_connstring GUC. If the NEON_AUTH_TOKEN environment @@ -110,7 +158,7 @@ pageserver_connect(int elevel) n++; } keywords[n] = "dbname"; - values[n] = page_server_connstring; + values[n] = local_pageserver_connstring; n++; keywords[n] = NULL; values[n] = NULL; @@ -254,6 +302,12 @@ pageserver_send(NeonRequest * request) { StringInfoData req_buff; + if(CheckConnstringUpdated()) + { + pageserver_disconnect(); + ReloadConnstring(); + } + /* If the connection was lost for some reason, reconnect */ if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD) { @@ -274,6 +328,7 @@ pageserver_send(NeonRequest * request) { while (!pageserver_connect(n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { + HandleMainLoopInterrupts(); n_reconnect_attempts += 1; pg_usleep(RECONNECT_INTERVAL_USEC); } @@ -391,7 +446,8 @@ pageserver_flush(void) return true; } -page_server_api api = { +page_server_api api = +{ .send = pageserver_send, .flush = pageserver_flush, .receive = pageserver_receive @@ -405,12 +461,72 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } +static Size +PagestoreShmemSize(void) +{ + return sizeof(PagestoreShmemState); +} + +static bool +PagestoreShmemInit(void) +{ + bool found; + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + pagestore_shared = ShmemInitStruct("libpagestore shared state", + PagestoreShmemSize(), + &found); + if(!found) + { + pagestore_shared->lock = &(GetNamedLWLockTranche("neon_libpagestore")->lock); + pg_atomic_init_u64(&pagestore_shared->update_counter, 0); + AssignPageserverConnstring(page_server_connstring, NULL); + } + LWLockRelease(AddinShmemInitLock); + return found; +} + +static void +pagestore_shmem_startup_hook(void) +{ + if(prev_shmem_startup_hook) + prev_shmem_startup_hook(); + + PagestoreShmemInit(); +} + +static void +pagestore_shmem_request(void) +{ +#if PG_VERSION_NUM >= 150000 + if(prev_shmem_request_hook) + prev_shmem_request_hook(); +#endif + + RequestAddinShmemSpace(PagestoreShmemSize()); + RequestNamedLWLockTranche("neon_libpagestore", 1); +} + +static void +pagestore_prepare_shmem(void) +{ +#if PG_VERSION_NUM >= 150000 + prev_shmem_request_hook = shmem_request_hook; + shmem_request_hook = pagestore_shmem_request; +#else + pagestore_shmem_request(); +#endif + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = pagestore_shmem_startup_hook; +} + /* * Module initialization function */ void pg_init_libpagestore(void) { + pagestore_prepare_shmem(); + DefineCustomStringVariable("neon.pageserver_connstring", "connection string to the page server", NULL, @@ -418,7 +534,7 @@ pg_init_libpagestore(void) "", PGC_SIGHUP, 0, /* no flags required */ - NULL, NULL, NULL); + CheckPageserverConnstring, AssignPageserverConnstring, NULL); DefineCustomStringVariable("neon.timeline_id", "Neon timeline_id the server is running on", @@ -499,7 +615,5 @@ pg_init_libpagestore(void) redo_read_buffer_filter = neon_redo_read_buffer_filter; } - prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler); - lfc_init(); } diff --git a/test_runner/regress/test_change_pageserver.py b/test_runner/regress/test_change_pageserver.py index 31092b70b9..410bf03c2b 100644 --- a/test_runner/regress/test_change_pageserver.py +++ b/test_runner/regress/test_change_pageserver.py @@ -1,9 +1,13 @@ +import asyncio + from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.remote_storage import RemoteStorageKind def test_change_pageserver(neon_env_builder: NeonEnvBuilder): + num_connections = 3 + neon_env_builder.num_pageservers = 2 neon_env_builder.enable_pageserver_remote_storage( remote_storage_kind=RemoteStorageKind.MOCK_S3, @@ -16,15 +20,24 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): alt_pageserver_id = env.pageservers[1].id env.pageservers[1].tenant_attach(env.initial_tenant) - pg_conn = endpoint.connect() - cur = pg_conn.cursor() + pg_conns = [endpoint.connect() for i in range(num_connections)] + curs = [pg_conn.cursor() for pg_conn in pg_conns] + + def execute(statement: str): + for cur in curs: + cur.execute(statement) + + def fetchone(): + results = [cur.fetchone() for cur in curs] + assert all(result == results[0] for result in results) + return results[0] # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers, otherwise the SELECT after restart will just return answer # from shared_buffers without hitting the page server, which defeats the point # of this test. - cur.execute("CREATE TABLE foo (t text)") - cur.execute( + curs[0].execute("CREATE TABLE foo (t text)") + curs[0].execute( """ INSERT INTO foo SELECT 'long string to consume some space' || g @@ -33,25 +46,25 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): ) # Verify that the table is larger than shared_buffers - cur.execute( + curs[0].execute( """ select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size from pg_settings where name = 'shared_buffers' """ ) - row = cur.fetchone() + row = curs[0].fetchone() assert row is not None log.info(f"shared_buffers is {row[0]}, table size {row[1]}") assert int(row[0]) < int(row[1]) - cur.execute("SELECT count(*) FROM foo") - assert cur.fetchone() == (100000,) + execute("SELECT count(*) FROM foo") + assert fetchone() == (100000,) endpoint.reconfigure(pageserver_id=alt_pageserver_id) # Verify that the neon.pageserver_connstring GUC is set to the correct thing - cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'") - connstring = cur.fetchone() + execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'") + connstring = fetchone() assert connstring is not None expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}" assert expected_connstring == expected_connstring @@ -60,5 +73,45 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): 0 ].stop() # Stop the old pageserver just to make sure we're reading from the new one - cur.execute("SELECT count(*) FROM foo") - assert cur.fetchone() == (100000,) + execute("SELECT count(*) FROM foo") + assert fetchone() == (100000,) + + # Try failing back, and this time we will stop the current pageserver before reconfiguring + # the endpoint. Whereas the previous reconfiguration was like a healthy migration, this + # is more like what happens in an unexpected pageserver failure. + env.pageservers[0].start() + env.pageservers[1].stop() + + endpoint.reconfigure(pageserver_id=env.pageservers[0].id) + + execute("SELECT count(*) FROM foo") + assert fetchone() == (100000,) + + env.pageservers[0].stop() + env.pageservers[1].start() + + # Test a (former) bug where a child process spins without updating its connection string + # by executing a query separately. This query will hang until we issue the reconfigure. + async def reconfigure_async(): + await asyncio.sleep( + 1 + ) # Sleep for 1 second just to make sure we actually started our count(*) query + endpoint.reconfigure(pageserver_id=env.pageservers[1].id) + + def execute_count(): + execute("SELECT count(*) FROM FOO") + + async def execute_and_reconfigure(): + task_exec = asyncio.to_thread(execute_count) + task_reconfig = asyncio.create_task(reconfigure_async()) + await asyncio.gather( + task_exec, + task_reconfig, + ) + + asyncio.run(execute_and_reconfigure()) + assert fetchone() == (100000,) + + # One final check that nothing hangs + execute("SELECT count(*) FROM foo") + assert fetchone() == (100000,)