diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index 864335fd2c..93900b5c2b 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -101,6 +101,19 @@ fn configurator_main_loop(compute: &Arc) { // node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant // into the type system. assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending); + + if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone()) + == Some(pspec.pageserver_connstr.clone()) + { + info!( + "Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration." + ); + state.status = ComputeStatus::Running; + compute.state_changed.notify_all(); + drop(state); + std::thread::sleep(std::time::Duration::from_secs(5)); + continue; + } // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner, diff --git a/pgxn/neon/extension_server.c b/pgxn/neon/extension_server.c index 00dcb6920e..d64cd3e4af 100644 --- a/pgxn/neon/extension_server.c +++ b/pgxn/neon/extension_server.c @@ -14,7 +14,7 @@ #include "extension_server.h" #include "neon_utils.h" -static int extension_server_port = 0; +int hadron_extension_server_port = 0; static int extension_server_request_timeout = 60; static int extension_server_connect_timeout = 60; @@ -47,7 +47,7 @@ neon_download_extension_file_http(const char *filename, bool is_library) curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, (long)extension_server_connect_timeout /* seconds */ ); compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s", - extension_server_port, filename, is_library ? "?is_library=true" : ""); + hadron_extension_server_port, filename, is_library ? "?is_library=true" : ""); elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url); @@ -82,7 +82,7 @@ pg_init_extension_server() DefineCustomIntVariable("neon.extension_server_port", "connection string to the compute_ctl", NULL, - &extension_server_port, + &hadron_extension_server_port, 0, 0, INT_MAX, PGC_POSTMASTER, 0, /* no flags required */ diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index caffdc9612..ab0736e180 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -13,6 +13,8 @@ #include #include +#include + #include "libpq-int.h" #include "access/xlog.h" @@ -86,6 +88,8 @@ static int pageserver_response_log_timeout = 10000; /* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */ static int pageserver_response_disconnect_timeout = 150000; +static int conf_refresh_reconnect_attempt_threshold = 16; + typedef struct { char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE]; @@ -130,7 +134,7 @@ static uint64 pagestore_local_counter = 0; typedef enum PSConnectionState { PS_Disconnected, /* no connection yet */ PS_Connecting_Startup, /* connection starting up */ - PS_Connecting_PageStream, /* negotiating pagestream */ + PS_Connecting_PageStream, /* negotiating pagestream */ PS_Connected, /* connected, pagestream established */ } PSConnectionState; @@ -401,7 +405,7 @@ get_shard_number(BufferTag *tag) } static inline void -CLEANUP_AND_DISCONNECT(PageServer *shard) +CLEANUP_AND_DISCONNECT(PageServer *shard) { if (shard->wes_read) { @@ -423,7 +427,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard) * complete the connection (e.g. due to receiving an earlier cancellation * during connection start). * Returns true if successfully connected; false if the connection failed. - * + * * Throws errors in unrecoverable situations, or when this backend's query * is canceled. */ @@ -1030,6 +1034,61 @@ pageserver_disconnect_shard(shardno_t shard_no) shard->state = PS_Disconnected; } +// BEGIN HADRON +/* + * Nudge compute_ctl to refresh our configuration. Called when we suspect we may be + * connecting to the wrong pageservers due to a stale configuration. + * + * This is a best-effort operation. If we couldn't send the local loopback HTTP request + * to compute_ctl or if the request fails for any reason, we just log the error and move + * on. + */ + +extern int hadron_extension_server_port; + +static void +hadron_request_configuration_refresh() { + static CURL *handle = NULL; + CURLcode res; + char *compute_ctl_url; + + if (!lakebase_mode) + return; + + if (handle == NULL) + { + handle = alloc_curl_handle(); + + curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST"); + curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ ); + curl_easy_setopt(handle, CURLOPT_POSTFIELDS, ""); + } + + // Set the URL + compute_ctl_url = psprintf("http://localhost:%d/refresh_configuration", hadron_extension_server_port); + + + elog(LOG, "Sending refresh configuration request to compute_ctl: %s", compute_ctl_url); + + curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url); + + res = curl_easy_perform(handle); + if (res != CURLE_OK) + { + elog(WARNING, "compute_ctl refresh_configuration request failed: %s\n", curl_easy_strerror(res)); + } + + // In regular Postgres usage, it is not necessary to manually free memory allocated by palloc (psprintf) because + // it will be cleaned up after the "memory context" is reset (e.g. after the query or the transaction is finished). + // However, the number of times this function gets called during a single query/transaction can be unbounded due to + // the various retry loops around calls to pageservers. Therefore, we need to manually free this memory here. + if (compute_ctl_url != NULL) + { + pfree(compute_ctl_url); + } +} +// END HADRON + static bool pageserver_send(shardno_t shard_no, NeonRequest *request) { @@ -1064,6 +1123,9 @@ pageserver_send(shardno_t shard_no, NeonRequest *request) while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR)) { shard->n_reconnect_attempts += 1; + if (shard->n_reconnect_attempts > conf_refresh_reconnect_attempt_threshold) { + hadron_request_configuration_refresh(); + } } shard->n_reconnect_attempts = 0; } else { @@ -1171,17 +1233,26 @@ pageserver_receive(shardno_t shard_no) pfree(msg); pageserver_disconnect(shard_no); resp = NULL; + + /* + * Always poke compute_ctl to request a configuration refresh if we have issues receiving data from pageservers after + * successfully connecting to it. It could be an indication that we are connecting to the wrong pageservers (e.g. PS + * is in secondary mode or otherwise refuses to respond our request). + */ + hadron_request_configuration_refresh(); } else if (rc == -2) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); pageserver_disconnect(shard_no); + hadron_request_configuration_refresh(); neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg); } else { pageserver_disconnect(shard_no); + hadron_request_configuration_refresh(); neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); } @@ -1249,18 +1320,21 @@ pageserver_try_receive(shardno_t shard_no) neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn))); pageserver_disconnect(shard_no); resp = NULL; + hadron_request_configuration_refresh(); } else if (rc == -2) { char *msg = pchomp(PQerrorMessage(pageserver_conn)); pageserver_disconnect(shard_no); + hadron_request_configuration_refresh(); neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: could not read COPY data: %s", msg); resp = NULL; } else { pageserver_disconnect(shard_no); + hadron_request_configuration_refresh(); neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc); } @@ -1460,6 +1534,16 @@ pg_init_libpagestore(void) PGC_SU_BACKEND, 0, /* no flags required */ NULL, NULL, NULL); + DefineCustomIntVariable("hadron.conf_refresh_reconnect_attempt_threshold", + "Threshold of the number of consecutive failed pageserver " + "connection attempts (per shard) before signaling " + "compute_ctl for a configuration refresh.", + NULL, + &conf_refresh_reconnect_attempt_threshold, + 16, 0, INT_MAX, + PGC_USERSET, + 0, + NULL, NULL, NULL); DefineCustomIntVariable("neon.pageserver_response_log_timeout", "pageserver response log timeout", diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e02b3b12f8..687500404a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4936,9 +4936,19 @@ class Endpoint(PgProtocol, LogUtils): # in the following commands. if safekeepers is not None: self.active_safekeepers = safekeepers - self.env.neon_cli.endpoint_reconfigure( - self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers - ) + + start_time = time.time() + while True: + try: + self.env.neon_cli.endpoint_reconfigure( + self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers + ) + return + except RuntimeError as e: + if time.time() - start_time > 120: + raise e + log.warning(f"Reconfigure failed with error: {e}. Retrying...") + time.sleep(5) def refresh_configuration(self): assert self.endpoint_id is not None diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index e17a8e989b..3ac61b5d8c 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -78,6 +78,9 @@ class Workload: """ if self._endpoint is not None: with ENDPOINT_LOCK: + # It's important that we update config.json before issuing the reconfigure request to make sure + # that PG-initiated spec refresh doesn't mess things up by reverting to the old spec. + self._endpoint.update_pageservers_in_config() self._endpoint.reconfigure() def endpoint(self, pageserver_id: int | None = None) -> Endpoint: @@ -97,10 +100,10 @@ class Workload: self._endpoint.start(pageserver_id=pageserver_id) self._configured_pageserver = pageserver_id else: - if self._configured_pageserver != pageserver_id: - self._configured_pageserver = pageserver_id - self._endpoint.reconfigure(pageserver_id=pageserver_id) - self._endpoint_config = pageserver_id + # It's important that we update config.json before issuing the reconfigure request to make sure + # that PG-initiated spec refresh doesn't mess things up by reverting to the old spec. + self._endpoint.update_pageservers_in_config(pageserver_id=pageserver_id) + self._endpoint.reconfigure(pageserver_id=pageserver_id) connstring = self._endpoint.safe_psql( "SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'" diff --git a/test_runner/regress/test_change_pageserver.py b/test_runner/regress/test_change_pageserver.py index bcdccac14e..af736af825 100644 --- a/test_runner/regress/test_change_pageserver.py +++ b/test_runner/regress/test_change_pageserver.py @@ -17,7 +17,7 @@ def reconfigure_endpoint(endpoint: Endpoint, pageserver_id: int, use_explicit_re # to make sure that PG-initiated config refresh doesn't mess things up by reverting to the old config. endpoint.update_pageservers_in_config(pageserver_id=pageserver_id) - # PG will eventually automatically refresh its configuration if it detects connectivity issues with pageservers. + # PG will automatically refresh its configuration if it detects connectivity issues with pageservers. # We also allow the test to explicitly request a reconfigure so that the test can be sure that the # endpoint is running with the latest configuration. #