diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 2fe07c4287..04ac077c7b 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -156,6 +156,7 @@ fn main() -> Result<()> { let path = Path::new(sp); let file = File::open(path)?; spec = Some(serde_json::from_reader(file)?); + live_config_allowed = true; } else if let Some(id) = compute_id { if let Some(cp_base) = control_plane_uri { live_config_allowed = true; diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index d43566d2df..bc68eeaa8a 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -798,6 +798,24 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( ep.start(&auth_token, safekeepers, remote_ext_config)?; } } + "reconfigure" => { + let endpoint_id = sub_args + .get_one::("endpoint_id") + .ok_or_else(|| anyhow!("No endpoint ID provided to reconfigure"))?; + let endpoint = cplane + .endpoints + .get(endpoint_id.as_str()) + .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; + let pageserver_id = + if let Some(id_str) = sub_args.get_one::("endpoint-pageserver-id") { + Some(NodeId( + id_str.parse().context("while parsing pageserver id")?, + )) + } else { + None + }; + endpoint.reconfigure(pageserver_id)?; + } "stop" => { let endpoint_id = sub_args .get_one::("endpoint_id") @@ -1369,6 +1387,12 @@ fn cli() -> Command { .arg(safekeepers_arg) .arg(remote_ext_config_args) ) + .subcommand(Command::new("reconfigure") + .about("Reconfigure the endpoint") + .arg(endpoint_pageserver_id_arg) + .arg(endpoint_id_arg.clone()) + .arg(tenant_id_arg.clone()) + ) .subcommand( Command::new("stop") .arg(endpoint_id_arg) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index acd9061664..cb16f48829 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -414,18 +414,34 @@ impl Endpoint { ); } - // Also wait for the compute_ctl process to die. It might have some cleanup - // work to do after postgres stops, like syncing safekeepers, etc. - // + Ok(()) + } + + fn wait_for_compute_ctl_to_exit(&self) -> Result<()> { // TODO use background_process::stop_process instead let pidfile_path = self.endpoint_path().join("compute_ctl.pid"); let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?; let pid = nix::unistd::Pid::from_raw(pid as i32); crate::background_process::wait_until_stopped("compute_ctl", pid)?; - Ok(()) } + fn read_postgresql_conf(&self) -> Result { + // Slurp the endpoints//postgresql.conf file into + // memory. We will include it in the spec file that we pass to + // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf + // in the data directory. + let postgresql_conf_path = self.endpoint_path().join("postgresql.conf"); + match std::fs::read(&postgresql_conf_path) { + Ok(content) => Ok(String::from_utf8(content)?), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()), + Err(e) => Err(anyhow::Error::new(e).context(format!( + "failed to read config file in {}", + postgresql_conf_path.to_str().unwrap() + ))), + } + } + pub fn start( &self, auth_token: &Option, @@ -436,21 +452,7 @@ impl Endpoint { anyhow::bail!("The endpoint is already running"); } - // Slurp the endpoints//postgresql.conf file into - // memory. We will include it in the spec file that we pass to - // `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf - // in the data directory. - let postgresql_conf_path = self.endpoint_path().join("postgresql.conf"); - let postgresql_conf = match std::fs::read(&postgresql_conf_path) { - Ok(content) => String::from_utf8(content)?, - Err(e) if e.kind() == std::io::ErrorKind::NotFound => "".to_string(), - Err(e) => { - return Err(anyhow::Error::new(e).context(format!( - "failed to read config file in {}", - postgresql_conf_path.to_str().unwrap() - ))) - } - }; + let postgresql_conf = self.read_postgresql_conf()?; // We always start the compute node from scratch, so if the Postgres // data dir exists from a previous launch, remove it first. @@ -621,6 +623,61 @@ impl Endpoint { } } + pub fn reconfigure(&self, pageserver_id: Option) -> Result<()> { + let mut spec: ComputeSpec = { + let spec_path = self.endpoint_path().join("spec.json"); + let file = std::fs::File::open(spec_path)?; + serde_json::from_reader(file)? + }; + + let postgresql_conf = self.read_postgresql_conf()?; + spec.cluster.postgresql_conf = Some(postgresql_conf); + + if let Some(pageserver_id) = pageserver_id { + let endpoint_config_path = self.endpoint_path().join("endpoint.json"); + let mut endpoint_conf: EndpointConf = { + let file = std::fs::File::open(&endpoint_config_path)?; + serde_json::from_reader(file)? + }; + endpoint_conf.pageserver_id = pageserver_id; + std::fs::write( + endpoint_config_path, + serde_json::to_string_pretty(&endpoint_conf)?, + )?; + + let pageserver = + PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?); + let ps_http_conf = &pageserver.pg_connection_config; + let (host, port) = (ps_http_conf.host(), ps_http_conf.port()); + spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}")); + } + + let client = reqwest::blocking::Client::new(); + let response = client + .post(format!( + "http://{}:{}/configure", + self.http_address.ip(), + self.http_address.port() + )) + .body(format!( + "{{\"spec\":{}}}", + serde_json::to_string_pretty(&spec)? + )) + .send()?; + + let status = response.status(); + if !(status.is_client_error() || status.is_server_error()) { + Ok(()) + } else { + let url = response.url().to_owned(); + let msg = match response.text() { + Ok(err_body) => format!("Error: {}", err_body), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }; + Err(anyhow::anyhow!(msg)) + } + } + pub fn stop(&self, destroy: bool) -> Result<()> { // If we are going to destroy data directory, // use immediate shutdown mode, otherwise, @@ -629,15 +686,25 @@ impl Endpoint { // Postgres is always started from scratch, so stop // without destroy only used for testing and debugging. // + self.pg_ctl( + if destroy { + &["-m", "immediate", "stop"] + } else { + &["stop"] + }, + &None, + )?; + + // Also wait for the compute_ctl process to die. It might have some cleanup + // work to do after postgres stops, like syncing safekeepers, etc. + // + self.wait_for_compute_ctl_to_exit()?; if destroy { - self.pg_ctl(&["-m", "immediate", "stop"], &None)?; println!( "Destroying postgres data directory '{}'", self.pgdata().to_str().unwrap() ); std::fs::remove_dir_all(self.endpoint_path())?; - } else { - self.pg_ctl(&["stop"], &None)?; } Ok(()) } diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index ca24ec7586..0a944a6bf0 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -19,6 +19,7 @@ #include "access/xlog.h" #include "access/xlogutils.h" #include "storage/buf_internals.h" +#include "c.h" #include "libpq-fe.h" #include "libpq/pqformat.h" @@ -63,6 +64,21 @@ int max_reconnect_attempts = 60; bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; static bool pageserver_flush(void); +static void pageserver_disconnect(void); + + +static pqsigfunc prev_signal_handler; + +static void +pageserver_sighup_handler(SIGNAL_ARGS) +{ + if (prev_signal_handler) + { + prev_signal_handler(postgres_signal_arg); + } + neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring); + pageserver_disconnect(); +} static bool pageserver_connect(int elevel) @@ -400,7 +416,7 @@ pg_init_libpagestore(void) NULL, &page_server_connstring, "", - PGC_POSTMASTER, + PGC_SIGHUP, 0, /* no flags required */ NULL, NULL, NULL); @@ -482,5 +498,8 @@ pg_init_libpagestore(void) old_redo_read_buffer_filter = redo_read_buffer_filter; redo_read_buffer_filter = neon_redo_read_buffer_filter; } + + prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler); + lfc_init(); } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d1d19fa542..7a3e0a9efc 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1445,6 +1445,20 @@ class NeonCli(AbstractNeonCli): res.check_returncode() return res + def endpoint_reconfigure( + self, + endpoint_id: str, + tenant_id: Optional[TenantId] = None, + pageserver_id: Optional[int] = None, + check_return_code=True, + ) -> "subprocess.CompletedProcess[str]": + args = ["endpoint", "reconfigure", endpoint_id] + if tenant_id is not None: + args.extend(["--tenant-id", str(tenant_id)]) + if pageserver_id is not None: + args.extend(["--pageserver-id", str(pageserver_id)]) + return self.raw_cli(args, check_return_code=check_return_code) + def endpoint_stop( self, endpoint_id: str, @@ -2534,6 +2548,10 @@ class Endpoint(PgProtocol): return self + def reconfigure(self, pageserver_id: Optional[int] = None): + assert self.endpoint_id is not None + self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id) + def respec(self, **kwargs): """Update the endpoint.json file used by control_plane.""" # Read config diff --git a/test_runner/regress/test_change_pageserver.py b/test_runner/regress/test_change_pageserver.py new file mode 100644 index 0000000000..31092b70b9 --- /dev/null +++ b/test_runner/regress/test_change_pageserver.py @@ -0,0 +1,64 @@ +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.remote_storage import RemoteStorageKind + + +def test_change_pageserver(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start() + + env.neon_cli.create_branch("test_change_pageserver") + endpoint = env.endpoints.create_start("test_change_pageserver") + + alt_pageserver_id = env.pageservers[1].id + env.pageservers[1].tenant_attach(env.initial_tenant) + + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + # Create table, and insert some rows. Make it big enough that it doesn't fit in + # shared_buffers, otherwise the SELECT after restart will just return answer + # from shared_buffers without hitting the page server, which defeats the point + # of this test. + cur.execute("CREATE TABLE foo (t text)") + cur.execute( + """ + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g + """ + ) + + # Verify that the table is larger than shared_buffers + cur.execute( + """ + select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size + from pg_settings where name = 'shared_buffers' + """ + ) + row = cur.fetchone() + assert row is not None + log.info(f"shared_buffers is {row[0]}, table size {row[1]}") + assert int(row[0]) < int(row[1]) + + cur.execute("SELECT count(*) FROM foo") + assert cur.fetchone() == (100000,) + + endpoint.reconfigure(pageserver_id=alt_pageserver_id) + + # Verify that the neon.pageserver_connstring GUC is set to the correct thing + cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'") + connstring = cur.fetchone() + assert connstring is not None + expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}" + assert expected_connstring == expected_connstring + + env.pageservers[ + 0 + ].stop() # Stop the old pageserver just to make sure we're reading from the new one + + cur.execute("SELECT count(*) FROM foo") + assert cur.fetchone() == (100000,)