diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index f381337346..2c05938f44 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -848,20 +848,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re let allow_multiple = sub_args.get_flag("allow-multiple"); - // If --safekeepers argument is given, use only the listed safekeeper nodes. - let safekeepers = - if let Some(safekeepers_str) = sub_args.get_one::("safekeepers") { - let mut safekeepers: Vec = Vec::new(); - for sk_id in safekeepers_str.split(',').map(str::trim) { - let sk_id = NodeId(u64::from_str(sk_id).map_err(|_| { - anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list") - })?); - safekeepers.push(sk_id); - } - safekeepers - } else { - env.safekeepers.iter().map(|sk| sk.id).collect() - }; + // If --safekeepers argument is given, use only the listed + // safekeeper nodes; otherwise all from the env. + let safekeepers = if let Some(safekeepers) = parse_safekeepers(sub_args)? { + safekeepers + } else { + env.safekeepers.iter().map(|sk| sk.id).collect() + }; let endpoint = cplane .endpoints @@ -965,7 +958,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re }) .collect::>() }; - endpoint.reconfigure(pageservers, None).await?; + // If --safekeepers argument is given, use only the listed + // safekeeper nodes; otherwise all from the env. + let safekeepers = parse_safekeepers(sub_args)?; + endpoint.reconfigure(pageservers, None, safekeepers).await?; } "stop" => { let endpoint_id = sub_args @@ -987,6 +983,23 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re Ok(()) } +/// Parse --safekeepers as list of safekeeper ids. +fn parse_safekeepers(sub_args: &ArgMatches) -> Result>> { + if let Some(safekeepers_str) = sub_args.get_one::("safekeepers") { + let mut safekeepers: Vec = Vec::new(); + for sk_id in safekeepers_str.split(',').map(str::trim) { + let sk_id = NodeId( + u64::from_str(sk_id) + .map_err(|_| anyhow!("invalid node ID \"{sk_id}\" in --safekeepers list"))?, + ); + safekeepers.push(sk_id); + } + Ok(Some(safekeepers)) + } else { + Ok(None) + } +} + fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> { let (sub_name, sub_args) = match sub_match.subcommand() { Some(ep_subcommand_data) => ep_subcommand_data, @@ -1590,7 +1603,7 @@ fn cli() -> Command { .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.") .arg(endpoint_id_arg.clone()) .arg(endpoint_pageserver_id_arg.clone()) - .arg(safekeepers_arg) + .arg(safekeepers_arg.clone()) .arg(remote_ext_config_args) .arg(create_test_user) .arg(allow_multiple.clone()) @@ -1599,6 +1612,7 @@ fn cli() -> Command { .subcommand(Command::new("reconfigure") .about("Reconfigure the endpoint") .arg(endpoint_pageserver_id_arg) + .arg(safekeepers_arg) .arg(endpoint_id_arg.clone()) .arg(tenant_id_arg.clone()) ) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index b928bbfc30..f9bb2da7e7 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -499,6 +499,23 @@ impl Endpoint { .join(",") } + /// Map safekeepers ids to the actual connection strings. + fn build_safekeepers_connstrs(&self, sk_ids: Vec) -> Result> { + let mut safekeeper_connstrings = Vec::new(); + if self.mode == ComputeMode::Primary { + for sk_id in sk_ids { + let sk = self + .env + .safekeepers + .iter() + .find(|node| node.id == sk_id) + .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?; + safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port())); + } + } + Ok(safekeeper_connstrings) + } + pub async fn start( &self, auth_token: &Option, @@ -523,18 +540,7 @@ impl Endpoint { let pageserver_connstring = Self::build_pageserver_connstr(&pageservers); assert!(!pageserver_connstring.is_empty()); - let mut safekeeper_connstrings = Vec::new(); - if self.mode == ComputeMode::Primary { - for sk_id in safekeepers { - let sk = self - .env - .safekeepers - .iter() - .find(|node| node.id == sk_id) - .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?; - safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port())); - } - } + let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?; // check for file remote_extensions_spec.json // if it is present, read it and pass to compute_ctl @@ -740,6 +746,7 @@ impl Endpoint { &self, mut pageservers: Vec<(Host, u16)>, stripe_size: Option, + safekeepers: Option>, ) -> Result<()> { let mut spec: ComputeSpec = { let spec_path = self.endpoint_path().join("spec.json"); @@ -774,6 +781,12 @@ impl Endpoint { spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize); } + // If safekeepers are not specified, don't change them. + if let Some(safekeepers) = safekeepers { + let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?; + spec.safekeeper_connstrings = safekeeper_connstrings; + } + let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .build() diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index da1a6f76f0..944b316344 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -63,6 +63,8 @@ char *wal_acceptors_list = ""; int wal_acceptor_reconnect_timeout = 1000; int wal_acceptor_connection_timeout = 10000; +/* Set to true in the walproposer bgw. */ +static bool am_walproposer; static WalproposerShmemState *walprop_shared; static WalProposerConfig walprop_config; static XLogRecPtr sentPtr = InvalidXLogRecPtr; @@ -76,6 +78,7 @@ static HotStandbyFeedback agg_hs_feedback; static void nwp_shmem_startup_hook(void); static void nwp_register_gucs(void); +static void assign_neon_safekeepers(const char *newval, void *extra); static void nwp_prepare_shmem(void); static uint64 backpressure_lag_impl(void); static bool backpressure_throttling_impl(void); @@ -111,7 +114,8 @@ init_walprop_config(bool syncSafekeepers) { walprop_config.neon_tenant = neon_tenant; walprop_config.neon_timeline = neon_timeline; - walprop_config.safekeepers_list = wal_acceptors_list; + /* WalProposerCreate scribbles directly on it, so pstrdup */ + walprop_config.safekeepers_list = pstrdup(wal_acceptors_list); walprop_config.safekeeper_reconnect_timeout = wal_acceptor_reconnect_timeout; walprop_config.safekeeper_connection_timeout = wal_acceptor_connection_timeout; walprop_config.wal_segment_size = wal_segment_size; @@ -151,6 +155,7 @@ WalProposerMain(Datum main_arg) init_walprop_config(false); walprop_pg_init_bgworker(); + am_walproposer = true; walprop_pg_load_libpqwalreceiver(); wp = WalProposerCreate(&walprop_config, walprop_pg); @@ -189,10 +194,10 @@ nwp_register_gucs(void) NULL, /* long_desc */ &wal_acceptors_list, /* valueAddr */ "", /* bootValue */ - PGC_POSTMASTER, + PGC_SIGHUP, GUC_LIST_INPUT, /* extensions can't use* * GUC_LIST_QUOTE */ - NULL, NULL, NULL); + NULL, assign_neon_safekeepers, NULL); DefineCustomIntVariable( "neon.safekeeper_reconnect_timeout", @@ -215,6 +220,33 @@ nwp_register_gucs(void) NULL, NULL, NULL); } +/* + * GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if + * the list changed. + */ +static void +assign_neon_safekeepers(const char *newval, void *extra) +{ + if (!am_walproposer) + return; + + if (!newval) { + /* should never happen */ + wpg_log(FATAL, "neon.safekeepers is empty"); + } + + /* + * TODO: restarting through FATAL is stupid and introduces 1s delay before + * next bgw start. We should refactor walproposer to allow graceful exit and + * thus remove this delay. + */ + if (strcmp(wal_acceptors_list, newval) != 0) + { + wpg_log(FATAL, "restarting walproposer to change safekeeper list from %s to %s", + wal_acceptors_list, newval); + } +} + /* Check if we need to suspend inserts because of lagging replication. */ static uint64 backpressure_lag_impl(void) @@ -363,7 +395,7 @@ walprop_register_bgworker(void) snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer"); snprintf(bgw.bgw_type, BGW_MAXLEN, "WAL proposer"); - bgw.bgw_restart_time = 5; + bgw.bgw_restart_time = 1; bgw.bgw_notify_pid = 0; bgw.bgw_main_arg = (Datum) 0; @@ -1639,6 +1671,18 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32 late_cv_trigger = ConditionVariableCancelSleep(); #endif + /* + * Process config if requested. This restarts walproposer if safekeepers + * list changed. Don't do that for sync-safekeepers because quite probably + * it (re-reading config) won't work without some effort, and + * sync-safekeepers should be quick to finish anyway. + */ + if (!wp->config->syncSafekeepers && ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + /* * If wait is terminated by latch set (walsenders' latch is set on each * wal flush). (no need for pm death check due to WL_EXIT_ON_PM_DEATH) diff --git a/storage_controller/src/compute_hook.rs b/storage_controller/src/compute_hook.rs index 4d0f8006aa..c46539485c 100644 --- a/storage_controller/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -323,7 +323,7 @@ impl ComputeHook { if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running { tracing::info!("Reconfiguring endpoint {}", endpoint_name,); endpoint - .reconfigure(compute_pageservers.clone(), *stripe_size) + .reconfigure(compute_pageservers.clone(), *stripe_size, None) .await .map_err(NotifyError::NeonLocal)?; } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6bfe1afd1f..a3f83abd3e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1933,6 +1933,7 @@ class NeonCli(AbstractNeonCli): endpoint_id: str, tenant_id: Optional[TenantId] = None, pageserver_id: Optional[int] = None, + safekeepers: Optional[List[int]] = None, check_return_code=True, ) -> "subprocess.CompletedProcess[str]": args = ["endpoint", "reconfigure", endpoint_id] @@ -1940,6 +1941,8 @@ class NeonCli(AbstractNeonCli): args.extend(["--tenant-id", str(tenant_id)]) if pageserver_id is not None: args.extend(["--pageserver-id", str(pageserver_id)]) + if safekeepers is not None: + args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) return self.raw_cli(args, check_return_code=check_return_code) def endpoint_stop( @@ -3484,6 +3487,7 @@ class Endpoint(PgProtocol, LogUtils): self.pg_port = pg_port self.http_port = http_port self.check_stop_result = check_stop_result + # passed to endpoint create and endpoint reconfigure self.active_safekeepers: List[int] = list(map(lambda sk: sk.id, env.safekeepers)) # path to conf is /endpoints//pgdata/postgresql.conf @@ -3552,6 +3556,7 @@ class Endpoint(PgProtocol, LogUtils): self, remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None, + safekeepers: Optional[List[int]] = None, allow_multiple: bool = False, ) -> "Endpoint": """ @@ -3561,6 +3566,11 @@ class Endpoint(PgProtocol, LogUtils): assert self.endpoint_id is not None + # If `safekeepers` is not None, they are remember them as active and use + # in the following commands. + if safekeepers is not None: + self.active_safekeepers = safekeepers + log.info(f"Starting postgres endpoint {self.endpoint_id}") self.env.neon_cli.endpoint_start( @@ -3624,9 +3634,17 @@ class Endpoint(PgProtocol, LogUtils): def is_running(self): return self._running._value > 0 - def reconfigure(self, pageserver_id: Optional[int] = None): + def reconfigure( + self, pageserver_id: Optional[int] = None, safekeepers: Optional[List[int]] = None + ): assert self.endpoint_id is not None - self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id) + # If `safekeepers` is not None, they are remember them as active and use + # in the following commands. + if safekeepers is not None: + self.active_safekeepers = safekeepers + self.env.neon_cli.endpoint_reconfigure( + self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers + ) def respec(self, **kwargs): """Update the endpoint.json file used by control_plane.""" diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index ac1a3bef67..febfc10293 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1725,7 +1725,10 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): # Basic pull_timeline test. -def test_pull_timeline(neon_env_builder: NeonEnvBuilder): +# When live_sk_change is False, compute is restarted to change set of +# safekeepers; otherwise it is live reload. +@pytest.mark.parametrize("live_sk_change", [False, True]) +def test_pull_timeline(neon_env_builder: NeonEnvBuilder, live_sk_change: bool): neon_env_builder.auth_enabled = True def execute_payload(endpoint: Endpoint): @@ -1758,8 +1761,7 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder): log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() endpoint = env.endpoints.create("main") - endpoint.active_safekeepers = [1, 2, 3] - endpoint.start() + endpoint.start(safekeepers=[1, 2, 3]) execute_payload(endpoint) show_statuses(env.safekeepers, tenant_id, timeline_id) @@ -1771,29 +1773,22 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder): log.info("Initialize new safekeeper 4, pull data from 1 & 3") env.safekeepers[3].start() - res = ( - env.safekeepers[3] - .http_client(auth_token=env.auth_keys.generate_safekeeper_token()) - .pull_timeline( - { - "tenant_id": str(tenant_id), - "timeline_id": str(timeline_id), - "http_hosts": [ - f"http://localhost:{env.safekeepers[0].port.http}", - f"http://localhost:{env.safekeepers[2].port.http}", - ], - } - ) + res = env.safekeepers[3].pull_timeline( + [env.safekeepers[0], env.safekeepers[2]], tenant_id, timeline_id ) log.info("Finished pulling timeline") log.info(res) show_statuses(env.safekeepers, tenant_id, timeline_id) - log.info("Restarting compute with new config to verify that it works") - endpoint.stop_and_destroy().create("main") - endpoint.active_safekeepers = [1, 3, 4] - endpoint.start() + action = "reconfiguing" if live_sk_change else "restarting" + log.info(f"{action} compute with new config to verify that it works") + new_sks = [1, 3, 4] + if not live_sk_change: + endpoint.stop_and_destroy().create("main") + endpoint.start(safekeepers=new_sks) + else: + endpoint.reconfigure(safekeepers=new_sks) execute_payload(endpoint) show_statuses(env.safekeepers, tenant_id, timeline_id)