mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
Support changing pageserver dynamically (#5542)
## Problem We currently require full restart of compute if we change the pageserver url ## Summary of changes Makes it so that we don't have to do a full restart, but can just send SIGHUP
This commit is contained in:
@@ -156,6 +156,7 @@ fn main() -> Result<()> {
|
||||
let path = Path::new(sp);
|
||||
let file = File::open(path)?;
|
||||
spec = Some(serde_json::from_reader(file)?);
|
||||
live_config_allowed = true;
|
||||
} else if let Some(id) = compute_id {
|
||||
if let Some(cp_base) = control_plane_uri {
|
||||
live_config_allowed = true;
|
||||
|
||||
@@ -798,6 +798,24 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
ep.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
}
|
||||
}
|
||||
"reconfigure" => {
|
||||
let endpoint_id = sub_args
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID provided to reconfigure"))?;
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
.get(endpoint_id.as_str())
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
|
||||
let pageserver_id =
|
||||
if let Some(id_str) = sub_args.get_one::<String>("endpoint-pageserver-id") {
|
||||
Some(NodeId(
|
||||
id_str.parse().context("while parsing pageserver id")?,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
endpoint.reconfigure(pageserver_id)?;
|
||||
}
|
||||
"stop" => {
|
||||
let endpoint_id = sub_args
|
||||
.get_one::<String>("endpoint_id")
|
||||
@@ -1369,6 +1387,12 @@ fn cli() -> Command {
|
||||
.arg(safekeepers_arg)
|
||||
.arg(remote_ext_config_args)
|
||||
)
|
||||
.subcommand(Command::new("reconfigure")
|
||||
.about("Reconfigure the endpoint")
|
||||
.arg(endpoint_pageserver_id_arg)
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
.arg(endpoint_id_arg)
|
||||
|
||||
@@ -414,18 +414,34 @@ impl Endpoint {
|
||||
);
|
||||
}
|
||||
|
||||
// Also wait for the compute_ctl process to die. It might have some cleanup
|
||||
// work to do after postgres stops, like syncing safekeepers, etc.
|
||||
//
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn wait_for_compute_ctl_to_exit(&self) -> Result<()> {
|
||||
// TODO use background_process::stop_process instead
|
||||
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
|
||||
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
|
||||
let pid = nix::unistd::Pid::from_raw(pid as i32);
|
||||
crate::background_process::wait_until_stopped("compute_ctl", pid)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_postgresql_conf(&self) -> Result<String> {
|
||||
// Slurp the endpoints/<endpoint id>/postgresql.conf file into
|
||||
// memory. We will include it in the spec file that we pass to
|
||||
// `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
|
||||
// in the data directory.
|
||||
let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
|
||||
match std::fs::read(&postgresql_conf_path) {
|
||||
Ok(content) => Ok(String::from_utf8(content)?),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok("".to_string()),
|
||||
Err(e) => Err(anyhow::Error::new(e).context(format!(
|
||||
"failed to read config file in {}",
|
||||
postgresql_conf_path.to_str().unwrap()
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
@@ -436,21 +452,7 @@ impl Endpoint {
|
||||
anyhow::bail!("The endpoint is already running");
|
||||
}
|
||||
|
||||
// Slurp the endpoints/<endpoint id>/postgresql.conf file into
|
||||
// memory. We will include it in the spec file that we pass to
|
||||
// `compute_ctl`, and `compute_ctl` will write it to the postgresql.conf
|
||||
// in the data directory.
|
||||
let postgresql_conf_path = self.endpoint_path().join("postgresql.conf");
|
||||
let postgresql_conf = match std::fs::read(&postgresql_conf_path) {
|
||||
Ok(content) => String::from_utf8(content)?,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => "".to_string(),
|
||||
Err(e) => {
|
||||
return Err(anyhow::Error::new(e).context(format!(
|
||||
"failed to read config file in {}",
|
||||
postgresql_conf_path.to_str().unwrap()
|
||||
)))
|
||||
}
|
||||
};
|
||||
let postgresql_conf = self.read_postgresql_conf()?;
|
||||
|
||||
// We always start the compute node from scratch, so if the Postgres
|
||||
// data dir exists from a previous launch, remove it first.
|
||||
@@ -621,6 +623,61 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn reconfigure(&self, pageserver_id: Option<NodeId>) -> Result<()> {
|
||||
let mut spec: ComputeSpec = {
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
let file = std::fs::File::open(spec_path)?;
|
||||
serde_json::from_reader(file)?
|
||||
};
|
||||
|
||||
let postgresql_conf = self.read_postgresql_conf()?;
|
||||
spec.cluster.postgresql_conf = Some(postgresql_conf);
|
||||
|
||||
if let Some(pageserver_id) = pageserver_id {
|
||||
let endpoint_config_path = self.endpoint_path().join("endpoint.json");
|
||||
let mut endpoint_conf: EndpointConf = {
|
||||
let file = std::fs::File::open(&endpoint_config_path)?;
|
||||
serde_json::from_reader(file)?
|
||||
};
|
||||
endpoint_conf.pageserver_id = pageserver_id;
|
||||
std::fs::write(
|
||||
endpoint_config_path,
|
||||
serde_json::to_string_pretty(&endpoint_conf)?,
|
||||
)?;
|
||||
|
||||
let pageserver =
|
||||
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
|
||||
let ps_http_conf = &pageserver.pg_connection_config;
|
||||
let (host, port) = (ps_http_conf.host(), ps_http_conf.port());
|
||||
spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}"));
|
||||
}
|
||||
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let response = client
|
||||
.post(format!(
|
||||
"http://{}:{}/configure",
|
||||
self.http_address.ip(),
|
||||
self.http_address.port()
|
||||
))
|
||||
.body(format!(
|
||||
"{{\"spec\":{}}}",
|
||||
serde_json::to_string_pretty(&spec)?
|
||||
))
|
||||
.send()?;
|
||||
|
||||
let status = response.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
Ok(())
|
||||
} else {
|
||||
let url = response.url().to_owned();
|
||||
let msg = match response.text() {
|
||||
Ok(err_body) => format!("Error: {}", err_body),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
};
|
||||
Err(anyhow::anyhow!(msg))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&self, destroy: bool) -> Result<()> {
|
||||
// If we are going to destroy data directory,
|
||||
// use immediate shutdown mode, otherwise,
|
||||
@@ -629,15 +686,25 @@ impl Endpoint {
|
||||
// Postgres is always started from scratch, so stop
|
||||
// without destroy only used for testing and debugging.
|
||||
//
|
||||
self.pg_ctl(
|
||||
if destroy {
|
||||
&["-m", "immediate", "stop"]
|
||||
} else {
|
||||
&["stop"]
|
||||
},
|
||||
&None,
|
||||
)?;
|
||||
|
||||
// Also wait for the compute_ctl process to die. It might have some cleanup
|
||||
// work to do after postgres stops, like syncing safekeepers, etc.
|
||||
//
|
||||
self.wait_for_compute_ctl_to_exit()?;
|
||||
if destroy {
|
||||
self.pg_ctl(&["-m", "immediate", "stop"], &None)?;
|
||||
println!(
|
||||
"Destroying postgres data directory '{}'",
|
||||
self.pgdata().to_str().unwrap()
|
||||
);
|
||||
std::fs::remove_dir_all(self.endpoint_path())?;
|
||||
} else {
|
||||
self.pg_ctl(&["stop"], &None)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "c.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "libpq/pqformat.h"
|
||||
@@ -63,6 +64,21 @@ int max_reconnect_attempts = 60;
|
||||
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
static bool pageserver_flush(void);
|
||||
static void pageserver_disconnect(void);
|
||||
|
||||
|
||||
static pqsigfunc prev_signal_handler;
|
||||
|
||||
static void
|
||||
pageserver_sighup_handler(SIGNAL_ARGS)
|
||||
{
|
||||
if (prev_signal_handler)
|
||||
{
|
||||
prev_signal_handler(postgres_signal_arg);
|
||||
}
|
||||
neon_log(LOG, "Received SIGHUP, disconnecting pageserver. New pageserver connstring is %s", page_server_connstring);
|
||||
pageserver_disconnect();
|
||||
}
|
||||
|
||||
static bool
|
||||
pageserver_connect(int elevel)
|
||||
@@ -400,7 +416,7 @@ pg_init_libpagestore(void)
|
||||
NULL,
|
||||
&page_server_connstring,
|
||||
"",
|
||||
PGC_POSTMASTER,
|
||||
PGC_SIGHUP,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
@@ -482,5 +498,8 @@ pg_init_libpagestore(void)
|
||||
old_redo_read_buffer_filter = redo_read_buffer_filter;
|
||||
redo_read_buffer_filter = neon_redo_read_buffer_filter;
|
||||
}
|
||||
|
||||
prev_signal_handler = pqsignal(SIGHUP, pageserver_sighup_handler);
|
||||
|
||||
lfc_init();
|
||||
}
|
||||
|
||||
@@ -1445,6 +1445,20 @@ class NeonCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def endpoint_reconfigure(
|
||||
self,
|
||||
endpoint_id: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
check_return_code=True,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = ["endpoint", "reconfigure", endpoint_id]
|
||||
if tenant_id is not None:
|
||||
args.extend(["--tenant-id", str(tenant_id)])
|
||||
if pageserver_id is not None:
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
return self.raw_cli(args, check_return_code=check_return_code)
|
||||
|
||||
def endpoint_stop(
|
||||
self,
|
||||
endpoint_id: str,
|
||||
@@ -2534,6 +2548,10 @@ class Endpoint(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def reconfigure(self, pageserver_id: Optional[int] = None):
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_reconfigure(self.endpoint_id, self.tenant_id, pageserver_id)
|
||||
|
||||
def respec(self, **kwargs):
|
||||
"""Update the endpoint.json file used by control_plane."""
|
||||
# Read config
|
||||
|
||||
64
test_runner/regress/test_change_pageserver.py
Normal file
64
test_runner/regress/test_change_pageserver.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
|
||||
|
||||
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_change_pageserver")
|
||||
endpoint = env.endpoints.create_start("test_change_pageserver")
|
||||
|
||||
alt_pageserver_id = env.pageservers[1].id
|
||||
env.pageservers[1].tenant_attach(env.initial_tenant)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create table, and insert some rows. Make it big enough that it doesn't fit in
|
||||
# shared_buffers, otherwise the SELECT after restart will just return answer
|
||||
# from shared_buffers without hitting the page server, which defeats the point
|
||||
# of this test.
|
||||
cur.execute("CREATE TABLE foo (t text)")
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
"""
|
||||
)
|
||||
|
||||
# Verify that the table is larger than shared_buffers
|
||||
cur.execute(
|
||||
"""
|
||||
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
|
||||
from pg_settings where name = 'shared_buffers'
|
||||
"""
|
||||
)
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
|
||||
assert int(row[0]) < int(row[1])
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000,)
|
||||
|
||||
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
|
||||
|
||||
# Verify that the neon.pageserver_connstring GUC is set to the correct thing
|
||||
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
|
||||
connstring = cur.fetchone()
|
||||
assert connstring is not None
|
||||
expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}"
|
||||
assert expected_connstring == expected_connstring
|
||||
|
||||
env.pageservers[
|
||||
0
|
||||
].stop() # Stop the old pageserver just to make sure we're reading from the new one
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000,)
|
||||
Reference in New Issue
Block a user