From 90cd5a5be85100dd27a782d0863db19bba498ef9 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Thu, 24 Jul 2025 09:26:21 -0500 Subject: [PATCH] [BRC-1778] Add mechanism to `compute_ctl` to pull a new config (#12711) ## Problem We have been dealing with a number of issues with the SC compute notification mechanism. Various race conditions exist in the PG/HCC/cplane/PS distributed system, and relying on the SC to send notifications to the compute node to notify it of PS changes is not robust. We decided to pursue a more robust option where the compute node itself discovers whether it may be pointing to the incorrect PSs and proactively reconfigure itself if issues are suspected. ## Summary of changes To support this self-healing reconfiguration mechanism several pieces are needed. This PR adds a mechanism to `compute_ctl` called "refresh configuration", where the compute node reaches out to the control plane to pull a new config and reconfigure PG using the new config, instead of listening for a notification message containing a config to arrive from the control plane. Main changes to compute_ctl: 1. The `compute_ctl` state machine now has a new State, `RefreshConfigurationPending`. The compute node may enter this state upon receiving a signal that it may be using the incorrect page servers. 2. Upon entering the `RefreshConfigurationPending` state, the background configurator thread in `compute_ctl` wakes up, pulls a new config from the control plane, and reconfigures PG (with `pg_ctl reload`) according to the new config. 3. The compute node may enter the new `RefreshConfigurationPending` state from `Running` or `Failed` states. If the configurator managed to configure the compute node successfully, it will enter the `Running` state, otherwise, it stays in `RefreshConfigurationPending` and the configurator thread will wait for the next notification if an incorrect config is still suspected. 4. Added various plumbing in `compute_ctl` data structures to allow the configurator thread to perform the config fetch. The "incorrect config suspected" notification is delivered using a HTTP endpoint, `/refresh_configuration`, on `compute_ctl`. This endpoint is currently not called by anyone other than the tests. In a follow up PR I will set up some code in the PG extension/libpagestore to call this HTTP endpoint whenever PG suspects that it is pointing to the wrong page servers. ## How is this tested? Modified `test_runner/regress/test_change_pageserver.py` to add a scenario where we use the new `/refresh_configuration` mechanism instead of the existing `/configure` mechanism (which requires us sending a full config to compute_ctl) to have the compute node reload and reconfigure its pageservers. I took one shortcut to reduce the scope of this change when it comes to testing: the compute node uses a local config file instead of pulling a config over the network from the HCC. This simplifies the test setup in the following ways: * The existing test framework is set up to use local config files for compute nodes only, so it's convenient if I just stick with it. * The HCC today generates a compute config with production settings (e.g., assuming 4 CPUs, 16GB RAM, with local file caches), which is probably not suitable in tests. We may need to add another test-only endpoint config to the control plane to make this work. The config-fetch part of the code is relatively straightforward (and well-covered in both production and the KIND test) so it is probably fine to replace it with loading from the local config file for these integration tests. In addition to making sure that the tests pass, I also manually inspected the logs to make sure that the compute node is indeed reloading the config using the new mechanism instead of going down the old `/configure` path (it turns out the test has bugs which causes compute `/configure` messages to be sent despite the test intending to disable/blackhole them). ```test 2024-09-24T18:53:29.573650Z INFO http request{otel.name=/refresh_configuration http.method=POST}: serving /refresh_configuration POST request 2024-09-24T18:53:29.573689Z INFO configurator_main_loop: compute node suspects its configuration is out of date, now refreshing configuration 2024-09-24T18:53:29.573706Z INFO configurator_main_loop: reloading config.json from path: /workspaces/hadron/test_output/test_change_pageserver_using_refresh[release-pg16]/repo/endpoints/ep-1/spec.json PG:2024-09-24 18:53:29.574 GMT [52799] LOG: received SIGHUP, reloading configuration files PG:2024-09-24 18:53:29.575 GMT [52799] LOG: parameter "neon.extension_server_port" cannot be changed without restarting the server PG:2024-09-24 18:53:29.575 GMT [52799] LOG: parameter "neon.pageserver_connstring" changed to "postgresql://no_user@localhost:15008" ... ``` Co-authored-by: William Huang --- compute_tools/README.md | 6 + compute_tools/src/bin/compute_ctl.rs | 3 + compute_tools/src/compute.rs | 12 +- compute_tools/src/configurator.rs | 104 ++++++++++++++++-- .../src/http/routes/refresh_configuration.rs | 20 ++-- compute_tools/src/http/server.rs | 4 +- control_plane/src/bin/neon_local.rs | 65 +++++++++++ control_plane/src/endpoint.rs | 53 ++++++++- libs/compute_api/src/responses.rs | 5 + test_runner/fixtures/neon_cli.py | 25 +++++ test_runner/fixtures/neon_fixtures.py | 8 ++ test_runner/regress/test_change_pageserver.py | 42 ++++++- 12 files changed, 315 insertions(+), 32 deletions(-) diff --git a/compute_tools/README.md b/compute_tools/README.md index 49f1368f0e..446b441c18 100644 --- a/compute_tools/README.md +++ b/compute_tools/README.md @@ -52,8 +52,14 @@ stateDiagram-v2 Init --> Running : Started Postgres Running --> TerminationPendingFast : Requested termination Running --> TerminationPendingImmediate : Requested termination + Running --> ConfigurationPending : Received a /configure request with spec + Running --> RefreshConfigurationPending : Received a /refresh_configuration request, compute node will pull a new spec and reconfigure + RefreshConfigurationPending --> Running : Compute has been re-configured TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status TerminationPendingImmediate --> Terminated : Terminated compute immediately + Running --> TerminationPending : Requested termination + TerminationPending --> Terminated : Terminated compute + Failed --> RefreshConfigurationPending : Received a /refresh_configuration request Failed --> [*] : Compute exited Terminated --> [*] : Compute exited ``` diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index ee8a504429..83a2e6dc68 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -235,6 +235,9 @@ fn main() -> Result<()> { pg_isready_bin: get_pg_isready_bin(&cli.pgbin), instance_id: std::env::var("INSTANCE_ID").ok(), lakebase_mode: cli.lakebase_mode, + build_tag: BUILD_TAG.to_string(), + control_plane_uri: cli.control_plane_uri, + config_path_test_only: cli.config, }, config, )?; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 56bf7b8632..e3ac887e9c 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -21,6 +21,7 @@ use postgres::NoTls; use postgres::error::SqlState; use remote_storage::{DownloadError, RemotePath}; use std::collections::{HashMap, HashSet}; +use std::ffi::OsString; use std::os::unix::fs::{PermissionsExt, symlink}; use std::path::Path; use std::process::{Command, Stdio}; @@ -120,6 +121,10 @@ pub struct ComputeNodeParams { // Path to the `pg_isready` binary. pub pg_isready_bin: String, pub lakebase_mode: bool, + + pub build_tag: String, + pub control_plane_uri: Option, + pub config_path_test_only: Option, } type TaskHandle = Mutex>>; @@ -1796,12 +1801,12 @@ impl ComputeNode { let states_allowing_configuration_refresh = [ ComputeStatus::Running, ComputeStatus::Failed, - // ComputeStatus::RefreshConfigurationPending, + ComputeStatus::RefreshConfigurationPending, ]; - let state = self.state.lock().expect("state lock poisoned"); + let mut state = self.state.lock().expect("state lock poisoned"); if states_allowing_configuration_refresh.contains(&state.status) { - // state.status = ComputeStatus::RefreshConfigurationPending; + state.status = ComputeStatus::RefreshConfigurationPending; self.state_changed.notify_all(); Ok(()) } else if state.status == ComputeStatus::Init { @@ -1988,6 +1993,7 @@ impl ComputeNode { // wait ComputeStatus::Init | ComputeStatus::Configuration + | ComputeStatus::RefreshConfigurationPending | ComputeStatus::Empty => { state = self.state_changed.wait(state).unwrap(); } diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index d97bd37285..864335fd2c 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -1,10 +1,12 @@ -use std::sync::Arc; +use std::fs::File; use std::thread; +use std::{path::Path, sync::Arc}; -use compute_api::responses::ComputeStatus; +use compute_api::responses::{ComputeConfig, ComputeStatus}; use tracing::{error, info, instrument}; -use crate::compute::ComputeNode; +use crate::compute::{ComputeNode, ParsedSpec}; +use crate::spec::get_config_from_control_plane; #[instrument(skip_all)] fn configurator_main_loop(compute: &Arc) { @@ -12,12 +14,22 @@ fn configurator_main_loop(compute: &Arc) { loop { let mut state = compute.state.lock().unwrap(); - // We have to re-check the status after re-acquiring the lock because it could be that - // the status has changed while we were waiting for the lock, and we might not need to - // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e. - // we are waiting for a condition variable that will never be signaled. - if state.status != ComputeStatus::ConfigurationPending { - state = compute.state_changed.wait(state).unwrap(); + if compute.params.lakebase_mode { + while state.status != ComputeStatus::ConfigurationPending + && state.status != ComputeStatus::RefreshConfigurationPending + && state.status != ComputeStatus::Failed + { + info!("configurator: compute status: {:?}, sleeping", state.status); + state = compute.state_changed.wait(state).unwrap(); + } + } else { + // We have to re-check the status after re-acquiring the lock because it could be that + // the status has changed while we were waiting for the lock, and we might not need to + // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e. + // we are waiting for a condition variable that will never be signaled. + if state.status != ComputeStatus::ConfigurationPending { + state = compute.state_changed.wait(state).unwrap(); + } } // Re-check the status after waking up @@ -38,6 +50,80 @@ fn configurator_main_loop(compute: &Arc) { // std::thread::sleep(std::time::Duration::from_millis(10000)); compute.set_status(new_status); + } else if state.status == ComputeStatus::RefreshConfigurationPending { + info!( + "compute node suspects its configuration is out of date, now refreshing configuration" + ); + // Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC. + // This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it + // is safe to drop the lock like this. + drop(state); + + let spec = if let Some(config_path) = &compute.params.config_path_test_only { + // This path is only to make testing easier. In production we always get the spec from the HCC. + info!( + "reloading config.json from path: {}", + config_path.to_string_lossy() + ); + let path = Path::new(config_path); + if let Ok(file) = File::open(path) { + match serde_json::from_reader::(file) { + Ok(config) => config.spec, + Err(e) => { + error!("could not parse spec file: {}", e); + None + } + } + } else { + error!( + "could not open config file at path: {}", + config_path.to_string_lossy() + ); + None + } + } else if let Some(control_plane_uri) = &compute.params.control_plane_uri { + match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) { + Ok(config) => config.spec, + Err(e) => { + error!("could not get config from control plane: {}", e); + None + } + } + } else { + None + }; + + if let Some(spec) = spec { + if let Ok(pspec) = ParsedSpec::try_from(spec) { + { + let mut state = compute.state.lock().unwrap(); + // Defensive programming to make sure this thread is indeed the only one that can move the compute + // node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant + // into the type system. + assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending); + // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire + // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a + // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner, + // but it's not worth forking the codebase too much for this minor point alone right now. + state.pspec = Some(pspec); + } + match compute.reconfigure() { + Ok(_) => { + info!("Refresh configuration: compute node configured"); + compute.set_status(ComputeStatus::Running); + } + Err(e) => { + error!( + "Refresh configuration: could not configure compute node: {}", + e + ); + // Leave the compute node in the `RefreshConfigurationPending` state if the configuration + // was not successful. It should be okay to treat this situation the same as if the loop + // hasn't executed yet as long as the detection side keeps notifying. + } + } + } + } } else if state.status == ComputeStatus::Failed { info!("compute node is now in Failed state, exiting"); break; diff --git a/compute_tools/src/http/routes/refresh_configuration.rs b/compute_tools/src/http/routes/refresh_configuration.rs index d00f5a285a..512abaa0a6 100644 --- a/compute_tools/src/http/routes/refresh_configuration.rs +++ b/compute_tools/src/http/routes/refresh_configuration.rs @@ -7,28 +7,22 @@ use axum::{ response::{IntoResponse, Response}, }; use http::StatusCode; -use tracing::debug; use crate::compute::ComputeNode; // use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS; use crate::http::JsonResponse; -// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec -// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait -// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause -// configuration to be reloaded in a best effort manner. Invocation of this method does not -// guarantee that a reconfiguration will occur. The caller should consider keep sending this -// request while it believes that the compute configuration is out of date. +/// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec +/// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait +/// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause +/// configuration to be reloaded in a best effort manner. Invocation of this method does not +/// guarantee that a reconfiguration will occur. The caller should consider keep sending this +/// request while it believes that the compute configuration is out of date. pub(in crate::http) async fn refresh_configuration( State(compute): State>, ) -> Response { - debug!("serving /refresh_configuration POST request"); - // POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc(); match compute.signal_refresh_configuration().await { Ok(_) => StatusCode::OK.into_response(), - Err(e) => { - tracing::error!("error handling /refresh_configuration request: {}", e); - JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e) - } + Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e), } } diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 9901a6de07..2fd3121f4f 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -23,11 +23,11 @@ use super::{ middleware::authorize::Authorize, routes::{ check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions, - grants, insights, lfc, metrics, metrics_json, promote, status, terminate, + grants, hadron_liveness_probe, insights, lfc, metrics, metrics_json, promote, + refresh_configuration, status, terminate, }, }; use crate::compute::ComputeNode; -use crate::http::routes::{hadron_liveness_probe, refresh_configuration}; /// `compute_ctl` has two servers: internal and external. The internal server /// binds to the loopback interface and handles communication from clients on diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index f68bc1ed48..c126835066 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -560,7 +560,9 @@ enum EndpointCmd { Create(EndpointCreateCmdArgs), Start(EndpointStartCmdArgs), Reconfigure(EndpointReconfigureCmdArgs), + RefreshConfiguration(EndpointRefreshConfigurationArgs), Stop(EndpointStopCmdArgs), + UpdatePageservers(EndpointUpdatePageserversCmdArgs), GenerateJwt(EndpointGenerateJwtCmdArgs), } @@ -721,6 +723,13 @@ struct EndpointReconfigureCmdArgs { safekeepers: Option, } +#[derive(clap::Args)] +#[clap(about = "Refresh the endpoint's configuration by forcing it reload it's spec")] +struct EndpointRefreshConfigurationArgs { + #[clap(help = "Postgres endpoint id")] + endpoint_id: String, +} + #[derive(clap::Args)] #[clap(about = "Stop an endpoint")] struct EndpointStopCmdArgs { @@ -738,6 +747,16 @@ struct EndpointStopCmdArgs { mode: EndpointTerminateMode, } +#[derive(clap::Args)] +#[clap(about = "Update the pageservers in the spec file of the compute endpoint")] +struct EndpointUpdatePageserversCmdArgs { + #[clap(help = "Postgres endpoint id")] + endpoint_id: String, + + #[clap(short = 'p', long, help = "Specified pageserver id")] + pageserver_id: Option, +} + #[derive(clap::Args)] #[clap(about = "Generate a JWT for an endpoint")] struct EndpointGenerateJwtCmdArgs { @@ -1625,6 +1644,44 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res println!("Starting existing endpoint {endpoint_id}..."); endpoint.start(args).await?; } + EndpointCmd::UpdatePageservers(args) => { + let endpoint_id = &args.endpoint_id; + let endpoint = cplane + .endpoints + .get(endpoint_id.as_str()) + .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; + let pageservers = match args.pageserver_id { + Some(pageserver_id) => { + let pageserver = + PageServerNode::from_env(env, env.get_pageserver_conf(pageserver_id)?); + + vec![( + PageserverProtocol::Libpq, + pageserver.pg_connection_config.host().clone(), + pageserver.pg_connection_config.port(), + )] + } + None => { + let storage_controller = StorageController::from_env(env); + storage_controller + .tenant_locate(endpoint.tenant_id) + .await? + .shards + .into_iter() + .map(|shard| { + ( + PageserverProtocol::Libpq, + Host::parse(&shard.listen_pg_addr) + .expect("Storage controller reported malformed host"), + shard.listen_pg_port, + ) + }) + .collect::>() + } + }; + + endpoint.update_pageservers_in_config(pageservers).await?; + } EndpointCmd::Reconfigure(args) => { let endpoint_id = &args.endpoint_id; let endpoint = cplane @@ -1678,6 +1735,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .reconfigure(Some(pageservers), None, safekeepers, None) .await?; } + EndpointCmd::RefreshConfiguration(args) => { + let endpoint_id = &args.endpoint_id; + let endpoint = cplane + .endpoints + .get(endpoint_id.as_str()) + .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; + endpoint.refresh_configuration().await?; + } EndpointCmd::Stop(args) => { let endpoint_id = &args.endpoint_id; let endpoint = cplane diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4c569d7005..20dcf85562 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -937,7 +937,8 @@ impl Endpoint { | ComputeStatus::Configuration | ComputeStatus::TerminationPendingFast | ComputeStatus::TerminationPendingImmediate - | ComputeStatus::Terminated => { + | ComputeStatus::Terminated + | ComputeStatus::RefreshConfigurationPending => { bail!("unexpected compute status: {:?}", state.status) } } @@ -960,6 +961,29 @@ impl Endpoint { Ok(()) } + // Update the pageservers in the spec file of the endpoint. This is useful to test the spec refresh scenario. + pub async fn update_pageservers_in_config( + &self, + pageservers: Vec<(PageserverProtocol, Host, u16)>, + ) -> Result<()> { + let config_path = self.endpoint_path().join("config.json"); + let mut config: ComputeConfig = { + let file = std::fs::File::open(&config_path)?; + serde_json::from_reader(file)? + }; + + let pageserver_connstring = Self::build_pageserver_connstr(&pageservers); + assert!(!pageserver_connstring.is_empty()); + let mut spec = config.spec.unwrap(); + spec.pageserver_connstring = Some(pageserver_connstring); + config.spec = Some(spec); + + let file = std::fs::File::create(&config_path)?; + serde_json::to_writer_pretty(file, &config)?; + + Ok(()) + } + // Call the /status HTTP API pub async fn get_status(&self) -> Result { let client = reqwest::Client::new(); @@ -1125,6 +1149,33 @@ impl Endpoint { Ok(response) } + pub async fn refresh_configuration(&self) -> Result<()> { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .unwrap(); + let response = client + .post(format!( + "http://{}:{}/refresh_configuration", + self.internal_http_address.ip(), + self.internal_http_address.port() + )) + .send() + .await?; + + let status = response.status(); + if !(status.is_client_error() || status.is_server_error()) { + Ok(()) + } else { + let url = response.url().to_owned(); + let msg = match response.text().await { + Ok(err_body) => format!("Error: {err_body}"), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }; + Err(anyhow::anyhow!(msg)) + } + } + pub fn connstr(&self, user: &str, db_name: &str) -> String { format!( "postgresql://{}@{}:{}/{}", diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 2ef1e6aab8..7efd94c76a 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -172,6 +172,8 @@ pub enum ComputeStatus { TerminationPendingImmediate, // Terminated Postgres Terminated, + // A spec refresh is being requested + RefreshConfigurationPending, } #[derive(Deserialize, Serialize)] @@ -193,6 +195,9 @@ impl Display for ComputeStatus { f.write_str("termination-pending-immediate") } ComputeStatus::Terminated => f.write_str("terminated"), + ComputeStatus::RefreshConfigurationPending => { + f.write_str("refresh-configuration-pending") + } } } } diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 1931d6aaa5..d7634f24a4 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -633,6 +633,15 @@ class NeonLocalCli(AbstractNeonCli): args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) return self.raw_cli(args, check_return_code=check_return_code) + def endpoint_refresh_configuration( + self, + endpoint_id: str, + ) -> subprocess.CompletedProcess[str]: + args = ["endpoint", "refresh-configuration", endpoint_id] + res = self.raw_cli(args) + res.check_returncode() + return res + def endpoint_stop( self, endpoint_id: str, @@ -657,6 +666,22 @@ class NeonLocalCli(AbstractNeonCli): lsn: Lsn | None = None if lsn_str == "null" else Lsn(lsn_str) return lsn, proc + def endpoint_update_pageservers( + self, + endpoint_id: str, + pageserver_id: int | None = None, + ) -> subprocess.CompletedProcess[str]: + args = [ + "endpoint", + "update-pageservers", + endpoint_id, + ] + if pageserver_id is not None: + args.extend(["--pageserver-id", str(pageserver_id)]) + res = self.raw_cli(args) + res.check_returncode() + return res + def mappings_map_branch( self, name: str, tenant_id: TenantId, timeline_id: TimelineId ) -> subprocess.CompletedProcess[str]: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e7763de0e7..e02b3b12f8 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4940,6 +4940,10 @@ class Endpoint(PgProtocol, LogUtils): self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers ) + def refresh_configuration(self): + assert self.endpoint_id is not None + self.env.neon_cli.endpoint_refresh_configuration(self.endpoint_id) + def respec(self, **kwargs: Any) -> None: """Update the endpoint.json file used by control_plane.""" # Read config @@ -4986,6 +4990,10 @@ class Endpoint(PgProtocol, LogUtils): log.debug("Updating compute config to: %s", json.dumps(config, indent=4)) json.dump(config, file, indent=4) + def update_pageservers_in_config(self, pageserver_id: int | None = None): + assert self.endpoint_id is not None + self.env.neon_cli.endpoint_update_pageservers(self.endpoint_id, pageserver_id) + def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None: """ Wait for all compute migrations to be ran. Remember that migrations only diff --git a/test_runner/regress/test_change_pageserver.py b/test_runner/regress/test_change_pageserver.py index b004db310c..bcdccac14e 100644 --- a/test_runner/regress/test_change_pageserver.py +++ b/test_runner/regress/test_change_pageserver.py @@ -3,14 +3,35 @@ from __future__ import annotations import asyncio from typing import TYPE_CHECKING +import pytest from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.remote_storage import RemoteStorageKind if TYPE_CHECKING: - from fixtures.neon_fixtures import NeonEnvBuilder + from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder -def test_change_pageserver(neon_env_builder: NeonEnvBuilder): +def reconfigure_endpoint(endpoint: Endpoint, pageserver_id: int, use_explicit_reconfigure: bool): + # It's important that we always update config.json before issuing any reconfigure requests + # to make sure that PG-initiated config refresh doesn't mess things up by reverting to the old config. + endpoint.update_pageservers_in_config(pageserver_id=pageserver_id) + + # PG will eventually automatically refresh its configuration if it detects connectivity issues with pageservers. + # We also allow the test to explicitly request a reconfigure so that the test can be sure that the + # endpoint is running with the latest configuration. + # + # Note that explicit reconfiguration is not required for the system to function or for this test to pass. + # It is kept for reference as this is how this test used to work before the capability of initiating + # configuration refreshes was added to compute nodes. + if use_explicit_reconfigure: + endpoint.reconfigure(pageserver_id=pageserver_id) + + +@pytest.mark.parametrize("use_explicit_reconfigure_for_failover", [False, True]) +def test_change_pageserver( + neon_env_builder: NeonEnvBuilder, use_explicit_reconfigure_for_failover: bool +): """ A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this is all done via the storage controller, but this test will disable the storage controller's compute @@ -72,7 +93,10 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): execute("SELECT count(*) FROM foo") assert fetchone() == (100000,) - endpoint.reconfigure(pageserver_id=alt_pageserver_id) + # Reconfigure the endpoint to use the alt pageserver. We issue an explicit reconfigure request here + # regardless of test mode as this is testing the externally driven reconfiguration scenario, not the + # compute-initiated reconfiguration scenario upon detecting failures. + reconfigure_endpoint(endpoint, pageserver_id=alt_pageserver_id, use_explicit_reconfigure=True) # Verify that the neon.pageserver_connstring GUC is set to the correct thing execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'") @@ -100,6 +124,12 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): env.storage_controller.node_configure(env.pageservers[1].id, {"availability": "Offline"}) env.storage_controller.reconcile_until_idle() + reconfigure_endpoint( + endpoint, + pageserver_id=env.pageservers[0].id, + use_explicit_reconfigure=use_explicit_reconfigure_for_failover, + ) + endpoint.reconfigure(pageserver_id=env.pageservers[0].id) execute("SELECT count(*) FROM foo") @@ -116,7 +146,11 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): await asyncio.sleep( 1 ) # Sleep for 1 second just to make sure we actually started our count(*) query - endpoint.reconfigure(pageserver_id=env.pageservers[1].id) + reconfigure_endpoint( + endpoint, + pageserver_id=env.pageservers[1].id, + use_explicit_reconfigure=use_explicit_reconfigure_for_failover, + ) def execute_count(): execute("SELECT count(*) FROM FOO")