diff --git a/compute_tools/README.md b/compute_tools/README.md index 49f1368f0e..446b441c18 100644 --- a/compute_tools/README.md +++ b/compute_tools/README.md @@ -52,8 +52,14 @@ stateDiagram-v2 Init --> Running : Started Postgres Running --> TerminationPendingFast : Requested termination Running --> TerminationPendingImmediate : Requested termination + Running --> ConfigurationPending : Received a /configure request with spec + Running --> RefreshConfigurationPending : Received a /refresh_configuration request, compute node will pull a new spec and reconfigure + RefreshConfigurationPending --> Running : Compute has been re-configured TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status TerminationPendingImmediate --> Terminated : Terminated compute immediately + Running --> TerminationPending : Requested termination + TerminationPending --> Terminated : Terminated compute + Failed --> RefreshConfigurationPending : Received a /refresh_configuration request Failed --> [*] : Compute exited Terminated --> [*] : Compute exited ``` diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index ee8a504429..83a2e6dc68 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -235,6 +235,9 @@ fn main() -> Result<()> { pg_isready_bin: get_pg_isready_bin(&cli.pgbin), instance_id: std::env::var("INSTANCE_ID").ok(), lakebase_mode: cli.lakebase_mode, + build_tag: BUILD_TAG.to_string(), + control_plane_uri: cli.control_plane_uri, + config_path_test_only: cli.config, }, config, )?; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 56bf7b8632..e3ac887e9c 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -21,6 +21,7 @@ use postgres::NoTls; use postgres::error::SqlState; use remote_storage::{DownloadError, RemotePath}; use std::collections::{HashMap, HashSet}; +use std::ffi::OsString; use std::os::unix::fs::{PermissionsExt, symlink}; use std::path::Path; use std::process::{Command, Stdio}; @@ -120,6 +121,10 @@ pub struct ComputeNodeParams { // Path to the `pg_isready` binary. pub pg_isready_bin: String, pub lakebase_mode: bool, + + pub build_tag: String, + pub control_plane_uri: Option, + pub config_path_test_only: Option, } type TaskHandle = Mutex>>; @@ -1796,12 +1801,12 @@ impl ComputeNode { let states_allowing_configuration_refresh = [ ComputeStatus::Running, ComputeStatus::Failed, - // ComputeStatus::RefreshConfigurationPending, + ComputeStatus::RefreshConfigurationPending, ]; - let state = self.state.lock().expect("state lock poisoned"); + let mut state = self.state.lock().expect("state lock poisoned"); if states_allowing_configuration_refresh.contains(&state.status) { - // state.status = ComputeStatus::RefreshConfigurationPending; + state.status = ComputeStatus::RefreshConfigurationPending; self.state_changed.notify_all(); Ok(()) } else if state.status == ComputeStatus::Init { @@ -1988,6 +1993,7 @@ impl ComputeNode { // wait ComputeStatus::Init | ComputeStatus::Configuration + | ComputeStatus::RefreshConfigurationPending | ComputeStatus::Empty => { state = self.state_changed.wait(state).unwrap(); } diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index d97bd37285..864335fd2c 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -1,10 +1,12 @@ -use std::sync::Arc; +use std::fs::File; use std::thread; +use std::{path::Path, sync::Arc}; -use compute_api::responses::ComputeStatus; +use compute_api::responses::{ComputeConfig, ComputeStatus}; use tracing::{error, info, instrument}; -use crate::compute::ComputeNode; +use crate::compute::{ComputeNode, ParsedSpec}; +use crate::spec::get_config_from_control_plane; #[instrument(skip_all)] fn configurator_main_loop(compute: &Arc) { @@ -12,12 +14,22 @@ fn configurator_main_loop(compute: &Arc) { loop { let mut state = compute.state.lock().unwrap(); - // We have to re-check the status after re-acquiring the lock because it could be that - // the status has changed while we were waiting for the lock, and we might not need to - // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e. - // we are waiting for a condition variable that will never be signaled. - if state.status != ComputeStatus::ConfigurationPending { - state = compute.state_changed.wait(state).unwrap(); + if compute.params.lakebase_mode { + while state.status != ComputeStatus::ConfigurationPending + && state.status != ComputeStatus::RefreshConfigurationPending + && state.status != ComputeStatus::Failed + { + info!("configurator: compute status: {:?}, sleeping", state.status); + state = compute.state_changed.wait(state).unwrap(); + } + } else { + // We have to re-check the status after re-acquiring the lock because it could be that + // the status has changed while we were waiting for the lock, and we might not need to + // wait on the condition variable. Otherwise, we might end up in some soft-/deadlock, i.e. + // we are waiting for a condition variable that will never be signaled. + if state.status != ComputeStatus::ConfigurationPending { + state = compute.state_changed.wait(state).unwrap(); + } } // Re-check the status after waking up @@ -38,6 +50,80 @@ fn configurator_main_loop(compute: &Arc) { // std::thread::sleep(std::time::Duration::from_millis(10000)); compute.set_status(new_status); + } else if state.status == ComputeStatus::RefreshConfigurationPending { + info!( + "compute node suspects its configuration is out of date, now refreshing configuration" + ); + // Drop the lock guard here to avoid holding the lock while downloading spec from the control plane / HCC. + // This is the only thread that can move compute_ctl out of the `RefreshConfigurationPending` state, so it + // is safe to drop the lock like this. + drop(state); + + let spec = if let Some(config_path) = &compute.params.config_path_test_only { + // This path is only to make testing easier. In production we always get the spec from the HCC. + info!( + "reloading config.json from path: {}", + config_path.to_string_lossy() + ); + let path = Path::new(config_path); + if let Ok(file) = File::open(path) { + match serde_json::from_reader::(file) { + Ok(config) => config.spec, + Err(e) => { + error!("could not parse spec file: {}", e); + None + } + } + } else { + error!( + "could not open config file at path: {}", + config_path.to_string_lossy() + ); + None + } + } else if let Some(control_plane_uri) = &compute.params.control_plane_uri { + match get_config_from_control_plane(control_plane_uri, &compute.params.compute_id) { + Ok(config) => config.spec, + Err(e) => { + error!("could not get config from control plane: {}", e); + None + } + } + } else { + None + }; + + if let Some(spec) = spec { + if let Ok(pspec) = ParsedSpec::try_from(spec) { + { + let mut state = compute.state.lock().unwrap(); + // Defensive programming to make sure this thread is indeed the only one that can move the compute + // node out of the `RefreshConfigurationPending` state. Would be nice if we can encode this invariant + // into the type system. + assert_eq!(state.status, ComputeStatus::RefreshConfigurationPending); + // state.pspec is consumed by compute.reconfigure() below. Note that compute.reconfigure() will acquire + // the compute.state lock again so we need to have the lock guard go out of scope here. We could add a + // "locked" variant of compute.reconfigure() that takes the lock guard as an argument to make this cleaner, + // but it's not worth forking the codebase too much for this minor point alone right now. + state.pspec = Some(pspec); + } + match compute.reconfigure() { + Ok(_) => { + info!("Refresh configuration: compute node configured"); + compute.set_status(ComputeStatus::Running); + } + Err(e) => { + error!( + "Refresh configuration: could not configure compute node: {}", + e + ); + // Leave the compute node in the `RefreshConfigurationPending` state if the configuration + // was not successful. It should be okay to treat this situation the same as if the loop + // hasn't executed yet as long as the detection side keeps notifying. + } + } + } + } } else if state.status == ComputeStatus::Failed { info!("compute node is now in Failed state, exiting"); break; diff --git a/compute_tools/src/http/routes/refresh_configuration.rs b/compute_tools/src/http/routes/refresh_configuration.rs index d00f5a285a..512abaa0a6 100644 --- a/compute_tools/src/http/routes/refresh_configuration.rs +++ b/compute_tools/src/http/routes/refresh_configuration.rs @@ -7,28 +7,22 @@ use axum::{ response::{IntoResponse, Response}, }; use http::StatusCode; -use tracing::debug; use crate::compute::ComputeNode; // use crate::hadron_metrics::POSTGRES_PAGESTREAM_REQUEST_ERRORS; use crate::http::JsonResponse; -// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec -// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait -// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause -// configuration to be reloaded in a best effort manner. Invocation of this method does not -// guarantee that a reconfiguration will occur. The caller should consider keep sending this -// request while it believes that the compute configuration is out of date. +/// The /refresh_configuration POST method is used to nudge compute_ctl to pull a new spec +/// from the HCC and attempt to reconfigure Postgres with the new spec. The method does not wait +/// for the reconfiguration to complete. Rather, it simply delivers a signal that will cause +/// configuration to be reloaded in a best effort manner. Invocation of this method does not +/// guarantee that a reconfiguration will occur. The caller should consider keep sending this +/// request while it believes that the compute configuration is out of date. pub(in crate::http) async fn refresh_configuration( State(compute): State>, ) -> Response { - debug!("serving /refresh_configuration POST request"); - // POSTGRES_PAGESTREAM_REQUEST_ERRORS.inc(); match compute.signal_refresh_configuration().await { Ok(_) => StatusCode::OK.into_response(), - Err(e) => { - tracing::error!("error handling /refresh_configuration request: {}", e); - JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e) - } + Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e), } } diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 9901a6de07..2fd3121f4f 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -23,11 +23,11 @@ use super::{ middleware::authorize::Authorize, routes::{ check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions, - grants, insights, lfc, metrics, metrics_json, promote, status, terminate, + grants, hadron_liveness_probe, insights, lfc, metrics, metrics_json, promote, + refresh_configuration, status, terminate, }, }; use crate::compute::ComputeNode; -use crate::http::routes::{hadron_liveness_probe, refresh_configuration}; /// `compute_ctl` has two servers: internal and external. The internal server /// binds to the loopback interface and handles communication from clients on diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index f68bc1ed48..c126835066 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -560,7 +560,9 @@ enum EndpointCmd { Create(EndpointCreateCmdArgs), Start(EndpointStartCmdArgs), Reconfigure(EndpointReconfigureCmdArgs), + RefreshConfiguration(EndpointRefreshConfigurationArgs), Stop(EndpointStopCmdArgs), + UpdatePageservers(EndpointUpdatePageserversCmdArgs), GenerateJwt(EndpointGenerateJwtCmdArgs), } @@ -721,6 +723,13 @@ struct EndpointReconfigureCmdArgs { safekeepers: Option, } +#[derive(clap::Args)] +#[clap(about = "Refresh the endpoint's configuration by forcing it reload it's spec")] +struct EndpointRefreshConfigurationArgs { + #[clap(help = "Postgres endpoint id")] + endpoint_id: String, +} + #[derive(clap::Args)] #[clap(about = "Stop an endpoint")] struct EndpointStopCmdArgs { @@ -738,6 +747,16 @@ struct EndpointStopCmdArgs { mode: EndpointTerminateMode, } +#[derive(clap::Args)] +#[clap(about = "Update the pageservers in the spec file of the compute endpoint")] +struct EndpointUpdatePageserversCmdArgs { + #[clap(help = "Postgres endpoint id")] + endpoint_id: String, + + #[clap(short = 'p', long, help = "Specified pageserver id")] + pageserver_id: Option, +} + #[derive(clap::Args)] #[clap(about = "Generate a JWT for an endpoint")] struct EndpointGenerateJwtCmdArgs { @@ -1625,6 +1644,44 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res println!("Starting existing endpoint {endpoint_id}..."); endpoint.start(args).await?; } + EndpointCmd::UpdatePageservers(args) => { + let endpoint_id = &args.endpoint_id; + let endpoint = cplane + .endpoints + .get(endpoint_id.as_str()) + .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; + let pageservers = match args.pageserver_id { + Some(pageserver_id) => { + let pageserver = + PageServerNode::from_env(env, env.get_pageserver_conf(pageserver_id)?); + + vec![( + PageserverProtocol::Libpq, + pageserver.pg_connection_config.host().clone(), + pageserver.pg_connection_config.port(), + )] + } + None => { + let storage_controller = StorageController::from_env(env); + storage_controller + .tenant_locate(endpoint.tenant_id) + .await? + .shards + .into_iter() + .map(|shard| { + ( + PageserverProtocol::Libpq, + Host::parse(&shard.listen_pg_addr) + .expect("Storage controller reported malformed host"), + shard.listen_pg_port, + ) + }) + .collect::>() + } + }; + + endpoint.update_pageservers_in_config(pageservers).await?; + } EndpointCmd::Reconfigure(args) => { let endpoint_id = &args.endpoint_id; let endpoint = cplane @@ -1678,6 +1735,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .reconfigure(Some(pageservers), None, safekeepers, None) .await?; } + EndpointCmd::RefreshConfiguration(args) => { + let endpoint_id = &args.endpoint_id; + let endpoint = cplane + .endpoints + .get(endpoint_id.as_str()) + .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; + endpoint.refresh_configuration().await?; + } EndpointCmd::Stop(args) => { let endpoint_id = &args.endpoint_id; let endpoint = cplane diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4c569d7005..20dcf85562 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -937,7 +937,8 @@ impl Endpoint { | ComputeStatus::Configuration | ComputeStatus::TerminationPendingFast | ComputeStatus::TerminationPendingImmediate - | ComputeStatus::Terminated => { + | ComputeStatus::Terminated + | ComputeStatus::RefreshConfigurationPending => { bail!("unexpected compute status: {:?}", state.status) } } @@ -960,6 +961,29 @@ impl Endpoint { Ok(()) } + // Update the pageservers in the spec file of the endpoint. This is useful to test the spec refresh scenario. + pub async fn update_pageservers_in_config( + &self, + pageservers: Vec<(PageserverProtocol, Host, u16)>, + ) -> Result<()> { + let config_path = self.endpoint_path().join("config.json"); + let mut config: ComputeConfig = { + let file = std::fs::File::open(&config_path)?; + serde_json::from_reader(file)? + }; + + let pageserver_connstring = Self::build_pageserver_connstr(&pageservers); + assert!(!pageserver_connstring.is_empty()); + let mut spec = config.spec.unwrap(); + spec.pageserver_connstring = Some(pageserver_connstring); + config.spec = Some(spec); + + let file = std::fs::File::create(&config_path)?; + serde_json::to_writer_pretty(file, &config)?; + + Ok(()) + } + // Call the /status HTTP API pub async fn get_status(&self) -> Result { let client = reqwest::Client::new(); @@ -1125,6 +1149,33 @@ impl Endpoint { Ok(response) } + pub async fn refresh_configuration(&self) -> Result<()> { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .unwrap(); + let response = client + .post(format!( + "http://{}:{}/refresh_configuration", + self.internal_http_address.ip(), + self.internal_http_address.port() + )) + .send() + .await?; + + let status = response.status(); + if !(status.is_client_error() || status.is_server_error()) { + Ok(()) + } else { + let url = response.url().to_owned(); + let msg = match response.text().await { + Ok(err_body) => format!("Error: {err_body}"), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }; + Err(anyhow::anyhow!(msg)) + } + } + pub fn connstr(&self, user: &str, db_name: &str) -> String { format!( "postgresql://{}@{}:{}/{}", diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 2ef1e6aab8..7efd94c76a 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -172,6 +172,8 @@ pub enum ComputeStatus { TerminationPendingImmediate, // Terminated Postgres Terminated, + // A spec refresh is being requested + RefreshConfigurationPending, } #[derive(Deserialize, Serialize)] @@ -193,6 +195,9 @@ impl Display for ComputeStatus { f.write_str("termination-pending-immediate") } ComputeStatus::Terminated => f.write_str("terminated"), + ComputeStatus::RefreshConfigurationPending => { + f.write_str("refresh-configuration-pending") + } } } } diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 1931d6aaa5..d7634f24a4 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -633,6 +633,15 @@ class NeonLocalCli(AbstractNeonCli): args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) return self.raw_cli(args, check_return_code=check_return_code) + def endpoint_refresh_configuration( + self, + endpoint_id: str, + ) -> subprocess.CompletedProcess[str]: + args = ["endpoint", "refresh-configuration", endpoint_id] + res = self.raw_cli(args) + res.check_returncode() + return res + def endpoint_stop( self, endpoint_id: str, @@ -657,6 +666,22 @@ class NeonLocalCli(AbstractNeonCli): lsn: Lsn | None = None if lsn_str == "null" else Lsn(lsn_str) return lsn, proc + def endpoint_update_pageservers( + self, + endpoint_id: str, + pageserver_id: int | None = None, + ) -> subprocess.CompletedProcess[str]: + args = [ + "endpoint", + "update-pageservers", + endpoint_id, + ] + if pageserver_id is not None: + args.extend(["--pageserver-id", str(pageserver_id)]) + res = self.raw_cli(args) + res.check_returncode() + return res + def mappings_map_branch( self, name: str, tenant_id: TenantId, timeline_id: TimelineId ) -> subprocess.CompletedProcess[str]: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e7763de0e7..e02b3b12f8 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4940,6 +4940,10 @@ class Endpoint(PgProtocol, LogUtils): self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers ) + def refresh_configuration(self): + assert self.endpoint_id is not None + self.env.neon_cli.endpoint_refresh_configuration(self.endpoint_id) + def respec(self, **kwargs: Any) -> None: """Update the endpoint.json file used by control_plane.""" # Read config @@ -4986,6 +4990,10 @@ class Endpoint(PgProtocol, LogUtils): log.debug("Updating compute config to: %s", json.dumps(config, indent=4)) json.dump(config, file, indent=4) + def update_pageservers_in_config(self, pageserver_id: int | None = None): + assert self.endpoint_id is not None + self.env.neon_cli.endpoint_update_pageservers(self.endpoint_id, pageserver_id) + def wait_for_migrations(self, wait_for: int = NUM_COMPUTE_MIGRATIONS) -> None: """ Wait for all compute migrations to be ran. Remember that migrations only diff --git a/test_runner/regress/test_change_pageserver.py b/test_runner/regress/test_change_pageserver.py index b004db310c..bcdccac14e 100644 --- a/test_runner/regress/test_change_pageserver.py +++ b/test_runner/regress/test_change_pageserver.py @@ -3,14 +3,35 @@ from __future__ import annotations import asyncio from typing import TYPE_CHECKING +import pytest from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.remote_storage import RemoteStorageKind if TYPE_CHECKING: - from fixtures.neon_fixtures import NeonEnvBuilder + from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder -def test_change_pageserver(neon_env_builder: NeonEnvBuilder): +def reconfigure_endpoint(endpoint: Endpoint, pageserver_id: int, use_explicit_reconfigure: bool): + # It's important that we always update config.json before issuing any reconfigure requests + # to make sure that PG-initiated config refresh doesn't mess things up by reverting to the old config. + endpoint.update_pageservers_in_config(pageserver_id=pageserver_id) + + # PG will eventually automatically refresh its configuration if it detects connectivity issues with pageservers. + # We also allow the test to explicitly request a reconfigure so that the test can be sure that the + # endpoint is running with the latest configuration. + # + # Note that explicit reconfiguration is not required for the system to function or for this test to pass. + # It is kept for reference as this is how this test used to work before the capability of initiating + # configuration refreshes was added to compute nodes. + if use_explicit_reconfigure: + endpoint.reconfigure(pageserver_id=pageserver_id) + + +@pytest.mark.parametrize("use_explicit_reconfigure_for_failover", [False, True]) +def test_change_pageserver( + neon_env_builder: NeonEnvBuilder, use_explicit_reconfigure_for_failover: bool +): """ A relatively low level test of reconfiguring a compute's pageserver at runtime. Usually this is all done via the storage controller, but this test will disable the storage controller's compute @@ -72,7 +93,10 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): execute("SELECT count(*) FROM foo") assert fetchone() == (100000,) - endpoint.reconfigure(pageserver_id=alt_pageserver_id) + # Reconfigure the endpoint to use the alt pageserver. We issue an explicit reconfigure request here + # regardless of test mode as this is testing the externally driven reconfiguration scenario, not the + # compute-initiated reconfiguration scenario upon detecting failures. + reconfigure_endpoint(endpoint, pageserver_id=alt_pageserver_id, use_explicit_reconfigure=True) # Verify that the neon.pageserver_connstring GUC is set to the correct thing execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'") @@ -100,6 +124,12 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): env.storage_controller.node_configure(env.pageservers[1].id, {"availability": "Offline"}) env.storage_controller.reconcile_until_idle() + reconfigure_endpoint( + endpoint, + pageserver_id=env.pageservers[0].id, + use_explicit_reconfigure=use_explicit_reconfigure_for_failover, + ) + endpoint.reconfigure(pageserver_id=env.pageservers[0].id) execute("SELECT count(*) FROM foo") @@ -116,7 +146,11 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder): await asyncio.sleep( 1 ) # Sleep for 1 second just to make sure we actually started our count(*) query - endpoint.reconfigure(pageserver_id=env.pageservers[1].id) + reconfigure_endpoint( + endpoint, + pageserver_id=env.pageservers[1].id, + use_explicit_reconfigure=use_explicit_reconfigure_for_failover, + ) def execute_count(): execute("SELECT count(*) FROM FOO")