mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
Allow to change compute safekeeper list without restart.
- Add --safekeepers option to neon_local reconfigure - Add it to python Endpoint reconfigure - Implement config reload in walproposer by restarting the whole bgw when safekeeper list changes. ref https://github.com/neondatabase/neon/issues/6341
This commit is contained in:
@@ -848,20 +848,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
|
||||
let allow_multiple = sub_args.get_flag("allow-multiple");
|
||||
|
||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||
let safekeepers =
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
let mut safekeepers: Vec<NodeId> = Vec::new();
|
||||
for sk_id in safekeepers_str.split(',').map(str::trim) {
|
||||
let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| {
|
||||
anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list")
|
||||
})?);
|
||||
safekeepers.push(sk_id);
|
||||
}
|
||||
safekeepers
|
||||
} else {
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
// If --safekeepers argument is given, use only the listed
|
||||
// safekeeper nodes; otherwise all from the env.
|
||||
let safekeepers = if let Some(safekeepers) = parse_safekeepers(sub_args)? {
|
||||
safekeepers
|
||||
} else {
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
@@ -965,7 +958,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
endpoint.reconfigure(pageservers, None).await?;
|
||||
// If --safekeepers argument is given, use only the listed
|
||||
// safekeeper nodes; otherwise all from the env.
|
||||
let safekeepers = parse_safekeepers(sub_args)?;
|
||||
endpoint.reconfigure(pageservers, None, safekeepers).await?;
|
||||
}
|
||||
"stop" => {
|
||||
let endpoint_id = sub_args
|
||||
@@ -987,6 +983,23 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse --safekeepers as list of safekeeper ids.
|
||||
fn parse_safekeepers(sub_args: &ArgMatches) -> Result<Option<Vec<NodeId>>> {
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
let mut safekeepers: Vec<NodeId> = Vec::new();
|
||||
for sk_id in safekeepers_str.split(',').map(str::trim) {
|
||||
let sk_id = NodeId(
|
||||
u64::from_str(sk_id)
|
||||
.map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?,
|
||||
);
|
||||
safekeepers.push(sk_id);
|
||||
}
|
||||
Ok(Some(safekeepers))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match sub_match.subcommand() {
|
||||
Some(ep_subcommand_data) => ep_subcommand_data,
|
||||
@@ -1590,7 +1603,7 @@ fn cli() -> Command {
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(endpoint_pageserver_id_arg.clone())
|
||||
.arg(safekeepers_arg)
|
||||
.arg(safekeepers_arg.clone())
|
||||
.arg(remote_ext_config_args)
|
||||
.arg(create_test_user)
|
||||
.arg(allow_multiple.clone())
|
||||
@@ -1599,6 +1612,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("reconfigure")
|
||||
.about("Reconfigure the endpoint")
|
||||
.arg(endpoint_pageserver_id_arg)
|
||||
.arg(safekeepers_arg)
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
)
|
||||
|
||||
@@ -499,6 +499,23 @@ impl Endpoint {
|
||||
.join(",")
|
||||
}
|
||||
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in sk_ids {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
Ok(safekeeper_connstrings)
|
||||
}
|
||||
|
||||
pub async fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
@@ -523,18 +540,7 @@ impl Endpoint {
|
||||
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
|
||||
assert!(!pageserver_connstring.is_empty());
|
||||
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in safekeepers {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
|
||||
// check for file remote_extensions_spec.json
|
||||
// if it is present, read it and pass to compute_ctl
|
||||
@@ -740,6 +746,7 @@ impl Endpoint {
|
||||
&self,
|
||||
mut pageservers: Vec<(Host, u16)>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
safekeepers: Option<Vec<NodeId>>,
|
||||
) -> Result<()> {
|
||||
let mut spec: ComputeSpec = {
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
@@ -774,6 +781,12 @@ impl Endpoint {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
|
||||
// If safekeepers are not specified, don't change them.
|
||||
if let Some(safekeepers) = safekeepers {
|
||||
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
|
||||
spec.safekeeper_connstrings = safekeeper_connstrings;
|
||||
}
|
||||
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
|
||||
@@ -63,6 +63,8 @@ char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
static WalproposerShmemState *walprop_shared;
|
||||
static WalProposerConfig walprop_config;
|
||||
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
|
||||
@@ -76,6 +78,7 @@ static HotStandbyFeedback agg_hs_feedback;
|
||||
|
||||
static void nwp_shmem_startup_hook(void);
|
||||
static void nwp_register_gucs(void);
|
||||
static void assign_neon_safekeepers(const char *newval, void *extra);
|
||||
static void nwp_prepare_shmem(void);
|
||||
static uint64 backpressure_lag_impl(void);
|
||||
static bool backpressure_throttling_impl(void);
|
||||
@@ -111,7 +114,8 @@ init_walprop_config(bool syncSafekeepers)
|
||||
{
|
||||
walprop_config.neon_tenant = neon_tenant;
|
||||
walprop_config.neon_timeline = neon_timeline;
|
||||
walprop_config.safekeepers_list = wal_acceptors_list;
|
||||
/* WalProposerCreate scribbles directly on it, so pstrdup */
|
||||
walprop_config.safekeepers_list = pstrdup(wal_acceptors_list);
|
||||
walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout;
|
||||
walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout;
|
||||
walprop_config.wal_segment_size = wal_segment_size;
|
||||
@@ -151,6 +155,7 @@ WalProposerMain(Datum main_arg)
|
||||
|
||||
init_walprop_config(false);
|
||||
walprop_pg_init_bgworker();
|
||||
am_walproposer = true;
|
||||
walprop_pg_load_libpqwalreceiver();
|
||||
|
||||
wp = WalProposerCreate(&walprop_config, walprop_pg);
|
||||
@@ -189,10 +194,10 @@ nwp_register_gucs(void)
|
||||
NULL, /* long_desc */
|
||||
&wal_acceptors_list, /* valueAddr */
|
||||
"", /* bootValue */
|
||||
PGC_POSTMASTER,
|
||||
PGC_SIGHUP,
|
||||
GUC_LIST_INPUT, /* extensions can't use*
|
||||
* GUC_LIST_QUOTE */
|
||||
NULL, NULL, NULL);
|
||||
NULL, assign_neon_safekeepers, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"neon.safekeeper_reconnect_timeout",
|
||||
@@ -215,6 +220,33 @@ nwp_register_gucs(void)
|
||||
NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
|
||||
* the list changed.
|
||||
*/
|
||||
static void
|
||||
assign_neon_safekeepers(const char *newval, void *extra)
|
||||
{
|
||||
if (!am_walproposer)
|
||||
return;
|
||||
|
||||
if (!newval) {
|
||||
/* should never happen */
|
||||
wpg_log(FATAL, "neon.safekeepers is empty");
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: restarting through FATAL is stupid and introduces 1s delay before
|
||||
* next bgw start. We should refactor walproposer to allow graceful exit and
|
||||
* thus remove this delay.
|
||||
*/
|
||||
if (strcmp(wal_acceptors_list, newval) != 0)
|
||||
{
|
||||
wpg_log(FATAL, "restarting walproposer to change safekeeper list from %s to %s",
|
||||
wal_acceptors_list, newval);
|
||||
}
|
||||
}
|
||||
|
||||
/* Check if we need to suspend inserts because of lagging replication. */
|
||||
static uint64
|
||||
backpressure_lag_impl(void)
|
||||
@@ -363,7 +395,7 @@ walprop_register_bgworker(void)
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "WAL proposer");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_restart_time = 1;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
@@ -1639,6 +1671,18 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
|
||||
late_cv_trigger = ConditionVariableCancelSleep();
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Process config if requested. This restarts walproposer if safekeepers
|
||||
* list changed. Don't do that for sync-safekeepers because quite probably
|
||||
* it (re-reading config) won't work without some effort, and
|
||||
* sync-safekeepers should be quick to finish anyway.
|
||||
*/
|
||||
if (!wp->config->syncSafekeepers && ConfigReloadPending)
|
||||
{
|
||||
ConfigReloadPending = false;
|
||||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
|
||||
/*
|
||||
* If wait is terminated by latch set (walsenders' latch is set on each
|
||||
* wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH)
|
||||
|
||||
@@ -323,7 +323,7 @@ impl ComputeHook {
|
||||
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
|
||||
endpoint
|
||||
.reconfigure(compute_pageservers.clone(), *stripe_size)
|
||||
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
|
||||
.await
|
||||
.map_err(NotifyError::NeonLocal)?;
|
||||
}
|
||||
|
||||
@@ -1933,6 +1933,7 @@ class NeonCli(AbstractNeonCli):
|
||||
endpoint_id: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
check_return_code=True,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = ["endpoint", "reconfigure", endpoint_id]
|
||||
@@ -1940,6 +1941,8 @@ class NeonCli(AbstractNeonCli):
|
||||
args.extend(["--tenant-id", str(tenant_id)])
|
||||
if pageserver_id is not None:
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
if safekeepers is not None:
|
||||
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def endpoint_stop(
|
||||
@@ -3484,6 +3487,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self.pg_port = pg_port
|
||||
self.http_port = http_port
|
||||
self.check_stop_result = check_stop_result
|
||||
# passed to endpoint create and endpoint reconfigure
|
||||
self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers))
|
||||
# path to conf is <repo_dir>/endpoints/<endpoint_id>/pgdata/postgresql.conf
|
||||
|
||||
@@ -3552,6 +3556,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
self,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
allow_multiple: bool = False,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
@@ -3561,6 +3566,11 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
assert self.endpoint_id is not None
|
||||
|
||||
# If `safekeepers` is not None, they are remember them as active and use
|
||||
# in the following commands.
|
||||
if safekeepers is not None:
|
||||
self.active_safekeepers = safekeepers
|
||||
|
||||
log.info(f"Starting postgres endpoint {self.endpoint_id}")
|
||||
|
||||
self.env.neon_cli.endpoint_start(
|
||||
@@ -3624,9 +3634,17 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
def is_running(self):
|
||||
return self._running._value > 0
|
||||
|
||||
def reconfigure(self, pageserver_id: Optional[int] = None):
|
||||
def reconfigure(
|
||||
self, pageserver_id: Optional[int] = None, safekeepers: Optional[List[int]] = None
|
||||
):
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
|
||||
# If `safekeepers` is not None, they are remember them as active and use
|
||||
# in the following commands.
|
||||
if safekeepers is not None:
|
||||
self.active_safekeepers = safekeepers
|
||||
self.env.neon_cli.endpoint_reconfigure(
|
||||
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers
|
||||
)
|
||||
|
||||
def respec(self, **kwargs):
|
||||
"""Update the endpoint.json file used by control_plane."""
|
||||
|
||||
@@ -1725,7 +1725,10 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
|
||||
|
||||
# Basic pull_timeline test.
|
||||
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
# When live_sk_change is False, compute is restarted to change set of
|
||||
# safekeepers; otherwise it is live reload.
|
||||
@pytest.mark.parametrize("live_sk_change", [False, True])
|
||||
def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool):
|
||||
neon_env_builder.auth_enabled = True
|
||||
|
||||
def execute_payload(endpoint: Endpoint):
|
||||
@@ -1758,8 +1761,7 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
log.info("Use only first 3 safekeepers")
|
||||
env.safekeepers[3].stop()
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.active_safekeepers = [1, 2, 3]
|
||||
endpoint.start()
|
||||
endpoint.start(safekeepers=[1, 2, 3])
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
@@ -1771,29 +1773,22 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
log.info("Initialize new safekeeper 4, pull data from 1 & 3")
|
||||
env.safekeepers[3].start()
|
||||
|
||||
res = (
|
||||
env.safekeepers[3]
|
||||
.http_client(auth_token=env.auth_keys.generate_safekeeper_token())
|
||||
.pull_timeline(
|
||||
{
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"http_hosts": [
|
||||
f"http://localhost:{env.safekeepers[0].port.http}",
|
||||
f"http://localhost:{env.safekeepers[2].port.http}",
|
||||
],
|
||||
}
|
||||
)
|
||||
res = env.safekeepers[3].pull_timeline(
|
||||
[env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id
|
||||
)
|
||||
log.info("Finished pulling timeline")
|
||||
log.info(res)
|
||||
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
log.info("Restarting compute with new config to verify that it works")
|
||||
endpoint.stop_and_destroy().create("main")
|
||||
endpoint.active_safekeepers = [1, 3, 4]
|
||||
endpoint.start()
|
||||
action = "reconfiguing" if live_sk_change else "restarting"
|
||||
log.info(f"{action} compute with new config to verify that it works")
|
||||
new_sks = [1, 3, 4]
|
||||
if not live_sk_change:
|
||||
endpoint.stop_and_destroy().create("main")
|
||||
endpoint.start(safekeepers=new_sks)
|
||||
else:
|
||||
endpoint.reconfigure(safekeepers=new_sks)
|
||||
|
||||
execute_payload(endpoint)
|
||||
show_statuses(env.safekeepers, tenant_id, timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user