mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 08:22:55 +00:00
[BRC-1778] Have PG signal compute_ctl to refresh configuration if it suspects that it is talking to the wrong PSs (#12712)
## Problem This is a follow-up to TODO, as part of the effort to rewire the compute reconfiguration/notification mechanism to make it more robust. Please refer to that commit or ticket BRC-1778 for full context of the problem. ## Summary of changes The previous change added mechanism in `compute_ctl` that makes it possible to refresh the configuration of PG on-demand by having `compute_ctl` go out to download a new config from the control plane/HCC. This change wired this mechanism up with PG so that PG will signal `compute_ctl` to refresh its configuration when it suspects that it could be talking to incorrect pageservers due to a stale configuration. PG will become suspicious that it is talking to the wrong pageservers in the following situations: 1. It cannot connect to a pageserver (e.g., getting a network-level connection refused error) 2. It can connect to a pageserver, but the pageserver does not return any data for the GetPage request 3. It can connect to a pageserver, but the pageserver returns a malformed response 4. It can connect to a pageserver, but there is an error receiving the GetPage request response for any other reason This change also includes a minor tweak to `compute_ctl`'s config refresh behavior. Upon receiving a request to refresh PG configuration, `compute_ctl` will reach out to download a config, but it will not attempt to apply the configuration if the config is the same as the old config is it replacing. This optimization is added because the act of reconfiguring itself requires working pageserver connections. In many failure situations it is likely that PG detects an issue with a pageserver before the control plane can detect the issue, migrate tenants, and update the compute config. In this case even the latest compute config won't point PG to working pageservers, causing the configuration attempt to hang and negatively impact PG's time-to-recovery. With this change, `compute_ctl` only attempts reconfiguration if the refreshed config points PG to different pageservers. ## How is this tested? The new code paths are exercised in all existing tests because this mechanism is on by default. Explicitly tested in `test_runner/regress/test_change_pageserver.py`. Co-authored-by: William Huang <william.huang@databricks.com>
This commit is contained in:
@@ -101,6 +101,19 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
// node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant
|
||||
// into the type system.
|
||||
assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending);
|
||||
|
||||
if state.pspec.as_ref().map(|ps| ps.pageserver_connstr.clone())
|
||||
== Some(pspec.pageserver_connstr.clone())
|
||||
{
|
||||
info!(
|
||||
"Refresh configuration: Retrieved spec is the same as the current spec. Waiting for control plane to update the spec before attempting reconfiguration."
|
||||
);
|
||||
state.status = ComputeStatus::Running;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
std::thread::sleep(std::time::Duration::from_secs(5));
|
||||
continue;
|
||||
}
|
||||
// state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire
|
||||
// the compute.state lock again so we need to have the lock guard go out of scope here. We could add a
|
||||
// "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner,
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
#include "extension_server.h"
|
||||
#include "neon_utils.h"
|
||||
|
||||
static int extension_server_port = 0;
|
||||
int hadron_extension_server_port = 0;
|
||||
static int extension_server_request_timeout = 60;
|
||||
static int extension_server_connect_timeout = 60;
|
||||
|
||||
@@ -47,7 +47,7 @@ neon_download_extension_file_http(const char *filename, bool is_library)
|
||||
curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, (long)extension_server_connect_timeout /* seconds */ );
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
|
||||
extension_server_port, filename, is_library ? "?is_library=true" : "");
|
||||
hadron_extension_server_port, filename, is_library ? "?is_library=true" : "");
|
||||
|
||||
elog(LOG, "Sending request to compute_ctl: %s", compute_ctl_url);
|
||||
|
||||
@@ -82,7 +82,7 @@ pg_init_extension_server()
|
||||
DefineCustomIntVariable("neon.extension_server_port",
|
||||
"connection string to the compute_ctl",
|
||||
NULL,
|
||||
&extension_server_port,
|
||||
&hadron_extension_server_port,
|
||||
0, 0, INT_MAX,
|
||||
PGC_POSTMASTER,
|
||||
0, /* no flags required */
|
||||
|
||||
@@ -13,6 +13,8 @@
|
||||
#include <math.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
#include "libpq-int.h"
|
||||
|
||||
#include "access/xlog.h"
|
||||
@@ -86,6 +88,8 @@ static int pageserver_response_log_timeout = 10000;
|
||||
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
|
||||
static int pageserver_response_disconnect_timeout = 150000;
|
||||
|
||||
static int conf_refresh_reconnect_attempt_threshold = 16;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE];
|
||||
@@ -130,7 +134,7 @@ static uint64 pagestore_local_counter = 0;
|
||||
typedef enum PSConnectionState {
|
||||
PS_Disconnected, /* no connection yet */
|
||||
PS_Connecting_Startup, /* connection starting up */
|
||||
PS_Connecting_PageStream, /* negotiating pagestream */
|
||||
PS_Connecting_PageStream, /* negotiating pagestream */
|
||||
PS_Connected, /* connected, pagestream established */
|
||||
} PSConnectionState;
|
||||
|
||||
@@ -401,7 +405,7 @@ get_shard_number(BufferTag *tag)
|
||||
}
|
||||
|
||||
static inline void
|
||||
CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
{
|
||||
if (shard->wes_read)
|
||||
{
|
||||
@@ -423,7 +427,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
* complete the connection (e.g. due to receiving an earlier cancellation
|
||||
* during connection start).
|
||||
* Returns true if successfully connected; false if the connection failed.
|
||||
*
|
||||
*
|
||||
* Throws errors in unrecoverable situations, or when this backend's query
|
||||
* is canceled.
|
||||
*/
|
||||
@@ -1030,6 +1034,61 @@ pageserver_disconnect_shard(shardno_t shard_no)
|
||||
shard->state = PS_Disconnected;
|
||||
}
|
||||
|
||||
// BEGIN HADRON
|
||||
/*
|
||||
* Nudge compute_ctl to refresh our configuration. Called when we suspect we may be
|
||||
* connecting to the wrong pageservers due to a stale configuration.
|
||||
*
|
||||
* This is a best-effort operation. If we couldn't send the local loopback HTTP request
|
||||
* to compute_ctl or if the request fails for any reason, we just log the error and move
|
||||
* on.
|
||||
*/
|
||||
|
||||
extern int hadron_extension_server_port;
|
||||
|
||||
static void
|
||||
hadron_request_configuration_refresh() {
|
||||
static CURL *handle = NULL;
|
||||
CURLcode res;
|
||||
char *compute_ctl_url;
|
||||
|
||||
if (!lakebase_mode)
|
||||
return;
|
||||
|
||||
if (handle == NULL)
|
||||
{
|
||||
handle = alloc_curl_handle();
|
||||
|
||||
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 3L /* seconds */ );
|
||||
curl_easy_setopt(handle, CURLOPT_POSTFIELDS, "");
|
||||
}
|
||||
|
||||
// Set the URL
|
||||
compute_ctl_url = psprintf("http://localhost:%d/refresh_configuration", hadron_extension_server_port);
|
||||
|
||||
|
||||
elog(LOG, "Sending refresh configuration request to compute_ctl: %s", compute_ctl_url);
|
||||
|
||||
curl_easy_setopt(handle, CURLOPT_URL, compute_ctl_url);
|
||||
|
||||
res = curl_easy_perform(handle);
|
||||
if (res != CURLE_OK)
|
||||
{
|
||||
elog(WARNING, "compute_ctl refresh_configuration request failed: %s\n", curl_easy_strerror(res));
|
||||
}
|
||||
|
||||
// In regular Postgres usage, it is not necessary to manually free memory allocated by palloc (psprintf) because
|
||||
// it will be cleaned up after the "memory context" is reset (e.g. after the query or the transaction is finished).
|
||||
// However, the number of times this function gets called during a single query/transaction can be unbounded due to
|
||||
// the various retry loops around calls to pageservers. Therefore, we need to manually free this memory here.
|
||||
if (compute_ctl_url != NULL)
|
||||
{
|
||||
pfree(compute_ctl_url);
|
||||
}
|
||||
}
|
||||
// END HADRON
|
||||
|
||||
static bool
|
||||
pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
{
|
||||
@@ -1064,6 +1123,9 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
|
||||
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
|
||||
{
|
||||
shard->n_reconnect_attempts += 1;
|
||||
if (shard->n_reconnect_attempts > conf_refresh_reconnect_attempt_threshold) {
|
||||
hadron_request_configuration_refresh();
|
||||
}
|
||||
}
|
||||
shard->n_reconnect_attempts = 0;
|
||||
} else {
|
||||
@@ -1171,17 +1233,26 @@ pageserver_receive(shardno_t shard_no)
|
||||
pfree(msg);
|
||||
pageserver_disconnect(shard_no);
|
||||
resp = NULL;
|
||||
|
||||
/*
|
||||
* Always poke compute_ctl to request a configuration refresh if we have issues receiving data from pageservers after
|
||||
* successfully connecting to it. It could be an indication that we are connecting to the wrong pageservers (e.g. PS
|
||||
* is in secondary mode or otherwise refuses to respond our request).
|
||||
*/
|
||||
hadron_request_configuration_refresh();
|
||||
}
|
||||
else if (rc == -2)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect(shard_no);
|
||||
hadron_request_configuration_refresh();
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: could not read COPY data: %s", msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect(shard_no);
|
||||
hadron_request_configuration_refresh();
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
|
||||
@@ -1249,18 +1320,21 @@ pageserver_try_receive(shardno_t shard_no)
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: psql end of copy data: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
pageserver_disconnect(shard_no);
|
||||
resp = NULL;
|
||||
hadron_request_configuration_refresh();
|
||||
}
|
||||
else if (rc == -2)
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
|
||||
pageserver_disconnect(shard_no);
|
||||
hadron_request_configuration_refresh();
|
||||
neon_shard_log(shard_no, LOG, "pageserver_receive disconnect: could not read COPY data: %s", msg);
|
||||
resp = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
pageserver_disconnect(shard_no);
|
||||
hadron_request_configuration_refresh();
|
||||
neon_shard_log(shard_no, ERROR, "pageserver_receive disconnect: unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
|
||||
@@ -1460,6 +1534,16 @@ pg_init_libpagestore(void)
|
||||
PGC_SU_BACKEND,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomIntVariable("hadron.conf_refresh_reconnect_attempt_threshold",
|
||||
"Threshold of the number of consecutive failed pageserver "
|
||||
"connection attempts (per shard) before signaling "
|
||||
"compute_ctl for a configuration refresh.",
|
||||
NULL,
|
||||
&conf_refresh_reconnect_attempt_threshold,
|
||||
16, 0, INT_MAX,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.pageserver_response_log_timeout",
|
||||
"pageserver response log timeout",
|
||||
|
||||
@@ -4936,9 +4936,19 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
# in the following commands.
|
||||
if safekeepers is not None:
|
||||
self.active_safekeepers = safekeepers
|
||||
self.env.neon_cli.endpoint_reconfigure(
|
||||
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
self.env.neon_cli.endpoint_reconfigure(
|
||||
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers
|
||||
)
|
||||
return
|
||||
except RuntimeError as e:
|
||||
if time.time() - start_time > 120:
|
||||
raise e
|
||||
log.warning(f"Reconfigure failed with error: {e}. Retrying...")
|
||||
time.sleep(5)
|
||||
|
||||
def refresh_configuration(self):
|
||||
assert self.endpoint_id is not None
|
||||
|
||||
@@ -78,6 +78,9 @@ class Workload:
|
||||
"""
|
||||
if self._endpoint is not None:
|
||||
with ENDPOINT_LOCK:
|
||||
# It's important that we update config.json before issuing the reconfigure request to make sure
|
||||
# that PG-initiated spec refresh doesn't mess things up by reverting to the old spec.
|
||||
self._endpoint.update_pageservers_in_config()
|
||||
self._endpoint.reconfigure()
|
||||
|
||||
def endpoint(self, pageserver_id: int | None = None) -> Endpoint:
|
||||
@@ -97,10 +100,10 @@ class Workload:
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
self._configured_pageserver = pageserver_id
|
||||
else:
|
||||
if self._configured_pageserver != pageserver_id:
|
||||
self._configured_pageserver = pageserver_id
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
self._endpoint_config = pageserver_id
|
||||
# It's important that we update config.json before issuing the reconfigure request to make sure
|
||||
# that PG-initiated spec refresh doesn't mess things up by reverting to the old spec.
|
||||
self._endpoint.update_pageservers_in_config(pageserver_id=pageserver_id)
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
|
||||
connstring = self._endpoint.safe_psql(
|
||||
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"
|
||||
|
||||
@@ -17,7 +17,7 @@ def reconfigure_endpoint(endpoint: Endpoint, pageserver_id: int, use_explicit_re
|
||||
# to make sure that PG-initiated config refresh doesn't mess things up by reverting to the old config.
|
||||
endpoint.update_pageservers_in_config(pageserver_id=pageserver_id)
|
||||
|
||||
# PG will eventually automatically refresh its configuration if it detects connectivity issues with pageservers.
|
||||
# PG will automatically refresh its configuration if it detects connectivity issues with pageservers.
|
||||
# We also allow the test to explicitly request a reconfigure so that the test can be sure that the
|
||||
# endpoint is running with the latest configuration.
|
||||
#
|
||||
|
||||
Reference in New Issue
Block a user