From 89554af1bd7c0ecfd79b5aab87629f042425f66d Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Thu, 24 Jul 2025 11:44:45 -0500 Subject: [PATCH] [BRC-1778] Have PG signal compute_ctl to refresh configuration if it suspects that it is talking to the wrong PSs (#12712) ## Problem This is a follow-up to TODO, as part of the effort to rewire the compute reconfiguration/notification mechanism to make it more robust. Please refer to that commit or ticket BRC-1778 for full context of the problem. ## Summary of changes The previous change added mechanism in `compute_ctl` that makes it possible to refresh the configuration of PG on-demand by having `compute_ctl` go out to download a new config from the control plane/HCC. This change wired this mechanism up with PG so that PG will signal `compute_ctl` to refresh its configuration when it suspects that it could be talking to incorrect pageservers due to a stale configuration. PG will become suspicious that it is talking to the wrong pageservers in the following situations: 1. It cannot connect to a pageserver (e.g., getting a network-level connection refused error) 2. It can connect to a pageserver, but the pageserver does not return any data for the GetPage request 3. It can connect to a pageserver, but the pageserver returns a malformed response 4. It can connect to a pageserver, but there is an error receiving the GetPage request response for any other reason This change also includes a minor tweak to `compute_ctl`'s config refresh behavior. Upon receiving a request to refresh PG configuration, `compute_ctl` will reach out to download a config, but it will not attempt to apply the configuration if the config is the same as the old config is it replacing. This optimization is added because the act of reconfiguring itself requires working pageserver connections. In many failure situations it is likely that PG detects an issue with a pageserver before the control plane can detect the issue, migrate tenants, and update the compute config. In this case even the latest compute config won't point PG to working pageservers, causing the configuration attempt to hang and negatively impact PG's time-to-recovery. With this change, `compute_ctl` only attempts reconfiguration if the refreshed config points PG to different pageservers. ## How is this tested? The new code paths are exercised in all existing tests because this mechanism is on by default. Explicitly tested in `test_runner/regress/test_change_pageserver.py`. Co-authored-by: William Huang --- compute_tools/src/configurator.rs | 13 +++ pgxn/neon/extension_server.c | 6 +- pgxn/neon/libpagestore.c | 90 ++++++++++++++++++- test_runner/fixtures/neon_fixtures.py | 16 +++- test_runner/fixtures/workload.py | 11 ++- test_runner/regress/test_change_pageserver.py | 2 +- 6 files changed, 124 insertions(+), 14 deletions(-) diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index 864335fd2c..93900b5c2b 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -101,6 +101,19 @@ fn configurator_main_loop(compute: &Arc) { // node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant // into the type system. assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending); + + if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone()) + == Some(pspec.pageserver_connstr.clone()) + { + info!( + "Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration." + ); + state.status = ComputeStatus::Running; + compute.state_changed.notify_all(); + drop(state); + std::thread::sleep(std::time::Duration::from_secs(5)); + continue; + } // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner, diff --git a/pgxn/neon/extension_server.c b/pgxn/neon/extension_server.c index 00dcb6920e..d64cd3e4af 100644 --- a/pgxn/neon/extension_server.c +++ b/pgxn/neon/extension_server.c @@ -14,7 +14,7 @@ #include "extension_server.h" #include "neon_utils.h" -static int extension_server_port = 0; +int hadron_extension_server_port = 0; static int extension_server_request_timeout = 60; static int extension_server_connect_timeout = 60; @@ -47,7 +47,7 @@ neon_download_extension_file_http(const char *filename, bool is_library) curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, (long)extension_server_connect_timeout /* seconds */ ); compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s", - extension_server_port, filename, is_library ? "?is_library=true" : ""); + hadron_extension_server_port, filename, is_library ? "?is_library=true" : ""); elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url); @@ -82,7 +82,7 @@ pg_init_extension_server() DefineCustomIntVariable("neon.extension_server_port", "connection string to the compute_ctl", NULL, - &extension_server_port, + &hadron_extension_server_port, 0, 0, INT_MAX, PGC_POSTMASTER, 0, /* no flags required */ diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index caffdc9612..ab0736e180 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -13,6 +13,8 @@ #include #include +#include + #include "libpq-int.h" #include "access/xlog.h" @@ -86,6 +88,8 @@ static int pageserver_response_log_timeout = 10000; /* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */ static int pageserver_response_disconnect_timeout = 150000; +static int conf_refresh_reconnect_attempt_threshold = 16; + typedef struct { char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE]; @@ -130,7 +134,7 @@ static uint64 pagestore_local_counter = 0; typedef enum PSConnectionState { PS_Disconnected, /* no connection yet */ PS_Connecting_Startup, /* connection starting up */ - PS_Connecting_PageStream, /* negotiating pagestream */ + PS_Connecting_PageStream, /* negotiating pagestream */ PS_Connected, /* connected, pagestream established */ } PSConnectionState; @@ -401,7 +405,7 @@ get_shard_number(BufferTag *tag) } static inline void -CLEANUP_AND_DISCONNECT(PageServer *shard) +CLEANUP_AND_DISCONNECT(PageServer *shard) { if (shard->wes_read) { @@ -423,7 +427,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard) * complete the connection (e.g. due to receiving an earlier cancellation * during connection start). * Returns true if successfully connected; false if the connection failed. - * + * * Throws errors in unrecoverable situations, or when this backend's query * is canceled. */ @@ -1030,6 +1034,61 @@ pageserver_disconnect_shard(shardno_t shard_no) shard->state = PS_Disconnected; } +// BEGIN HADRON +/* + * Nudge compute_ctl to refresh our configuration. Called when we suspect we may be + * connecting to the wrong pageservers due to a stale configuration. + * + * This is a best-effort operation. If we couldn't send the local loopback HTTP request + * to compute_ctl or if the request fails for any reason, we just log the error and move + * on. + */ + +extern int hadron_extension_server_port; + +static void +hadron_request_configuration_refresh() { + static CURL *handle = NULL; + CURLcode res; + char *compute_ctl_url; + + if (!lakebase_mode) + return; + + if (handle == NULL) + { + handle = alloc_curl_handle(); + + curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST"); + curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ ); + curl_easy_setopt(handle, CURLOPT_POSTFIELDS, ""); + } + + // Set the URL + compute_ctl_url = psprintf("http://localhost:%d/refresh_configuration", hadron_extension_server_port); + + + elog(LOG, "Sending refresh configuration request to compute_ctl: %s", compute_ctl_url); + + curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url); + + res = curl_easy_perform(handle); + if (res != CURLE_OK) + { + elog(WARNING, "compute_ctl refresh_configuration request failed: %s\n", curl_easy_strerror(res)); + } + + // In regular Postgres usage, it is not necessary to manually free memory allocated by palloc (psprintf) because + // it will be cleaned up after the "memory context" is reset (e.g. after the query or the transaction is finished). + // However, the number of times this function gets called during a single query/transaction can be unbounded due to + // the various retry loops around calls to pageservers. Therefore, we need to manually free this memory here. + if (compute_ctl_url != NULL) + { + pfree(compute_ctl_url); + } +} +// END HADRON + static bool pageserver_send(shardno_t shard_no, NeonRequest *request) { @@ -1064,6 +1123,9 @@ pageserver_send(shardno_t shard_no, NeonRequest *request) while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { shard->n_reconnect_attempts += 1; + if (shard->n_reconnect_attempts > conf_refresh_reconnect_attempt_threshold) { + hadron_request_configuration_refresh(); + } } shard->n_reconnect_attempts = 0; } else { @@ -1171,17 +1233,26 @@ pageserver_receive(shardno_t shard_no) pfree(msg); pageserver_disconnect(shard_no); resp = NULL; + + /* + * Always poke compute_ctl to request a configuration refresh if we have issues receiving data from pageservers after + * successfully connecting to it. It could be an indication that we are connecting to the wrong pageservers (e.g. PS + * is in secondary mode or otherwise refuses to respond our request). + */ + hadron_request_configuration_refresh(); } else if (rc == -2) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); pageserver_disconnect(shard_no); + hadron_request_configuration_refresh(); neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg); } else { pageserver_disconnect(shard_no); + hadron_request_configuration_refresh(); neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); } @@ -1249,18 +1320,21 @@ pageserver_try_receive(shardno_t shard_no) neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn))); pageserver_disconnect(shard_no); resp = NULL; + hadron_request_configuration_refresh(); } else if (rc == -2) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); pageserver_disconnect(shard_no); + hadron_request_configuration_refresh(); neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: could not read COPY data: %s", msg); resp = NULL; } else { pageserver_disconnect(shard_no); + hadron_request_configuration_refresh(); neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); } @@ -1460,6 +1534,16 @@ pg_init_libpagestore(void) PGC_SU_BACKEND, 0, /* no flags required */ NULL, NULL, NULL); + DefineCustomIntVariable("hadron.conf_refresh_reconnect_attempt_threshold", + "Threshold of the number of consecutive failed pageserver " + "connection attempts (per shard) before signaling " + "compute_ctl for a configuration refresh.", + NULL, + &conf_refresh_reconnect_attempt_threshold, + 16, 0, INT_MAX, + PGC_USERSET, + 0, + NULL, NULL, NULL); DefineCustomIntVariable("neon.pageserver_response_log_timeout", "pageserver response log timeout", diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e02b3b12f8..687500404a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4936,9 +4936,19 @@ class Endpoint(PgProtocol, LogUtils): # in the following commands. if safekeepers is not None: self.active_safekeepers = safekeepers - self.env.neon_cli.endpoint_reconfigure( - self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers - ) + + start_time = time.time() + while True: + try: + self.env.neon_cli.endpoint_reconfigure( + self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers + ) + return + except RuntimeError as e: + if time.time() - start_time > 120: + raise e + log.warning(f"Reconfigure failed with error: {e}. Retrying...") + time.sleep(5) def refresh_configuration(self): assert self.endpoint_id is not None diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index e17a8e989b..3ac61b5d8c 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -78,6 +78,9 @@ class Workload: """ if self._endpoint is not None: with ENDPOINT_LOCK: + # It's important that we update config.json before issuing the reconfigure request to make sure + # that PG-initiated spec refresh doesn't mess things up by reverting to the old spec. + self._endpoint.update_pageservers_in_config() self._endpoint.reconfigure() def endpoint(self, pageserver_id: int | None = None) -> Endpoint: @@ -97,10 +100,10 @@ class Workload: self._endpoint.start(pageserver_id=pageserver_id) self._configured_pageserver = pageserver_id else: - if self._configured_pageserver != pageserver_id: - self._configured_pageserver = pageserver_id - self._endpoint.reconfigure(pageserver_id=pageserver_id) - self._endpoint_config = pageserver_id + # It's important that we update config.json before issuing the reconfigure request to make sure + # that PG-initiated spec refresh doesn't mess things up by reverting to the old spec. + self._endpoint.update_pageservers_in_config(pageserver_id=pageserver_id) + self._endpoint.reconfigure(pageserver_id=pageserver_id) connstring = self._endpoint.safe_psql( "SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'" diff --git a/test_runner/regress/test_change_pageserver.py b/test_runner/regress/test_change_pageserver.py index bcdccac14e..af736af825 100644 --- a/test_runner/regress/test_change_pageserver.py +++ b/test_runner/regress/test_change_pageserver.py @@ -17,7 +17,7 @@ def reconfigure_endpoint(endpoint: Endpoint, pageserver_id: int, use_explicit_re # to make sure that PG-initiated config refresh doesn't mess things up by reverting to the old config. endpoint.update_pageservers_in_config(pageserver_id=pageserver_id) - # PG will eventually automatically refresh its configuration if it detects connectivity issues with pageservers. + # PG will automatically refresh its configuration if it detects connectivity issues with pageservers. # We also allow the test to explicitly request a reconfigure so that the test can be sure that the # endpoint is running with the latest configuration. #