mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Forward pageserver connection string from compute to safekeeper
This is needed for implementation of tenant rebalancing. With this change safekeeper becomes aware of which pageserver is supposed to be used for replication from this particular compute.
This commit is contained in:
committed by
Dmitry Rodionov
parent
bd34d7ecfc
commit
557e3024cd
@@ -199,17 +199,24 @@ impl PostgresNode {
|
||||
})
|
||||
}
|
||||
|
||||
fn sync_safekeepers(&self) -> Result<Lsn> {
|
||||
fn sync_safekeepers(&self, auth_token: &Option<String>) -> Result<Lsn> {
|
||||
let pg_path = self.env.pg_bin_dir().join("postgres");
|
||||
let sync_handle = Command::new(pg_path)
|
||||
.arg("--sync-safekeepers")
|
||||
let mut cmd = Command::new(&pg_path);
|
||||
|
||||
cmd.arg("--sync-safekeepers")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("PGDATA", self.pgdata().to_str().unwrap())
|
||||
.stdout(Stdio::piped())
|
||||
// Comment this to avoid capturing stderr (useful if command hangs)
|
||||
.stderr(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
if let Some(token) = auth_token {
|
||||
cmd.env("ZENITH_AUTH_TOKEN", token);
|
||||
}
|
||||
|
||||
let sync_handle = cmd
|
||||
.spawn()
|
||||
.expect("postgres --sync-safekeepers failed to start");
|
||||
|
||||
@@ -319,8 +326,11 @@ impl PostgresNode {
|
||||
} else {
|
||||
""
|
||||
};
|
||||
|
||||
format!("host={} port={} password={}", host, port, password)
|
||||
// NOTE avoiding spaces in connection string, because it is less error prone if we forward it somewhere.
|
||||
// Also note that not all parameters are supported here. Because in compute we substitute $ZENITH_AUTH_TOKEN
|
||||
// We parse this string and build it back with token from env var, and for simplicity rebuild
|
||||
// uses only needed variables namely host, port, user, password.
|
||||
format!("postgresql://no_user:{}@{}:{}", password, host, port)
|
||||
};
|
||||
conf.append("shared_preload_libraries", "zenith");
|
||||
conf.append_line("");
|
||||
@@ -358,7 +368,7 @@ impl PostgresNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_basebackup(&self) -> Result<()> {
|
||||
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
|
||||
let backup_lsn = if let Some(lsn) = self.lsn {
|
||||
Some(lsn)
|
||||
} else if self.uses_wal_proposer {
|
||||
@@ -366,7 +376,7 @@ impl PostgresNode {
|
||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||
// procedure evolves quite actively right now, so let's think about it again
|
||||
// when things would be more stable (TODO).
|
||||
let lsn = self.sync_safekeepers()?;
|
||||
let lsn = self.sync_safekeepers(auth_token)?;
|
||||
if lsn == Lsn(0) {
|
||||
None
|
||||
} else {
|
||||
@@ -417,7 +427,6 @@ impl PostgresNode {
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
|
||||
|
||||
if let Some(token) = auth_token {
|
||||
cmd.env("ZENITH_AUTH_TOKEN", token);
|
||||
}
|
||||
@@ -451,7 +460,7 @@ impl PostgresNode {
|
||||
fs::write(&postgresql_conf_path, postgresql_conf)?;
|
||||
|
||||
// 3. Load basebackup
|
||||
self.load_basebackup()?;
|
||||
self.load_basebackup(auth_token)?;
|
||||
|
||||
if self.lsn.is_some() {
|
||||
File::create(self.pgdata().join("standby.signal"))?;
|
||||
|
||||
@@ -15,13 +15,11 @@ use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use zenith_utils::http::error::HttpErrorBody;
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
|
||||
use crate::local_env::{LocalEnv, SafekeeperConf};
|
||||
use crate::read_pidfile;
|
||||
use crate::storage::PageServerNode;
|
||||
use zenith_utils::connstring::connection_address;
|
||||
use zenith_utils::connstring::connection_host_port;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SafekeeperHttpError {
|
||||
@@ -116,17 +114,6 @@ impl SafekeeperNode {
|
||||
);
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
// Configure connection to page server
|
||||
//
|
||||
// FIXME: We extract the host and port from the connection string instead of using
|
||||
// the connection string directly, because the 'safekeeper' binary expects
|
||||
// host:port format. That's a bit silly when we already have a full libpq connection
|
||||
// string at hand.
|
||||
let pageserver_conn = {
|
||||
let (host, port) = connection_host_port(&self.pageserver.pg_connection_config);
|
||||
format!("{}:{}", host, port)
|
||||
};
|
||||
|
||||
let listen_pg = format!("localhost:{}", self.conf.pg_port);
|
||||
let listen_http = format!("localhost:{}", self.conf.http_port);
|
||||
|
||||
@@ -134,7 +121,6 @@ impl SafekeeperNode {
|
||||
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
|
||||
.args(&["--listen-pg", &listen_pg])
|
||||
.args(&["--listen-http", &listen_http])
|
||||
.args(&["--pageserver", &pageserver_conn])
|
||||
.args(&["--recall", "1 second"])
|
||||
.arg("--daemonize")
|
||||
.env_clear()
|
||||
@@ -143,10 +129,6 @@ impl SafekeeperNode {
|
||||
cmd.arg("--no-sync");
|
||||
}
|
||||
|
||||
if self.env.pageserver.auth_type == AuthType::ZenithJWT {
|
||||
cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token);
|
||||
}
|
||||
|
||||
let var = "LLVM_PROFILE_FILE";
|
||||
if let Some(val) = std::env::var_os(var) {
|
||||
cmd.env(var, val);
|
||||
|
||||
@@ -340,6 +340,7 @@ class ProposerPostgres:
|
||||
"synchronous_standby_names = 'walproposer'\n",
|
||||
f"zenith.zenith_timeline = '{self.timeline_id}'\n",
|
||||
f"zenith.zenith_tenant = '{self.tenant_id}'\n",
|
||||
f"zenith.page_server_connstring = ''\n",
|
||||
f"wal_acceptors = '{wal_acceptors}'\n",
|
||||
])
|
||||
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: be8bdba074...e0d50b91ef
@@ -44,6 +44,9 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
|
||||
)
|
||||
// FIXME this argument is no longer needed since pageserver address is forwarded from compute.
|
||||
// However because this argument is in use by console's e2e tests lets keep it for now and remove separately.
|
||||
// So currently it is a noop.
|
||||
.arg(
|
||||
Arg::with_name("pageserver")
|
||||
.short("p")
|
||||
@@ -101,10 +104,6 @@ fn main() -> Result<()> {
|
||||
conf.listen_http_addr = addr.to_owned();
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("pageserver") {
|
||||
conf.pageserver_addr = Some(addr.to_owned());
|
||||
}
|
||||
|
||||
if let Some(ttl) = arg_matches.value_of("ttl") {
|
||||
conf.ttl = Some(humantime::parse_duration(ttl)?);
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use std::env;
|
||||
use zenith_utils::zid::ZTimelineId;
|
||||
|
||||
pub mod http;
|
||||
@@ -40,9 +39,6 @@ pub struct SafeKeeperConf {
|
||||
pub no_sync: bool,
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub pageserver_addr: Option<String>,
|
||||
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
|
||||
pub pageserver_auth_token: Option<String>,
|
||||
pub ttl: Option<Duration>,
|
||||
pub recall_period: Option<Duration>,
|
||||
}
|
||||
@@ -62,12 +58,10 @@ impl Default for SafeKeeperConf {
|
||||
workdir: PathBuf::from("./"),
|
||||
daemonize: false,
|
||||
no_sync: false,
|
||||
pageserver_addr: None,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
ttl: None,
|
||||
recall_period: None,
|
||||
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,20 +28,22 @@ pub struct ReceiveWalConn<'pg> {
|
||||
pg_backend: &'pg mut PostgresBackend,
|
||||
/// The cached result of `pg_backend.socket().peer_addr()` (roughly)
|
||||
peer_addr: SocketAddr,
|
||||
/// Pageserver connection string forwarded from compute
|
||||
/// NOTE that it is allowed to operate without a pageserver.
|
||||
/// So if compute has no pageserver configured do not use it.
|
||||
pageserver_connstr: Option<String>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Periodically request pageserver to call back.
|
||||
/// If pageserver already has replication channel, it will just ignore this request
|
||||
///
|
||||
fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTenantId) {
|
||||
let ps_addr = conf.pageserver_addr.unwrap();
|
||||
let ps_connstr = format!(
|
||||
"postgresql://no_user:{}@{}/no_db",
|
||||
&conf.pageserver_auth_token.unwrap_or_default(),
|
||||
ps_addr
|
||||
);
|
||||
|
||||
fn request_callback(
|
||||
conf: SafeKeeperConf,
|
||||
pageserver_connstr: String,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) {
|
||||
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
|
||||
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr);
|
||||
let me_conf: Config = me_connstr.parse().unwrap();
|
||||
@@ -54,15 +56,18 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe
|
||||
loop {
|
||||
info!(
|
||||
"requesting page server to connect to us: start {} {}",
|
||||
ps_connstr, callme
|
||||
pageserver_connstr, callme
|
||||
);
|
||||
match Client::connect(&ps_connstr, NoTls) {
|
||||
match Client::connect(&pageserver_connstr, NoTls) {
|
||||
Ok(mut client) => {
|
||||
if let Err(e) = client.simple_query(&callme) {
|
||||
error!("Failed to send callme request to pageserver: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to connect to pageserver {}: {}", &ps_connstr, e),
|
||||
Err(e) => error!(
|
||||
"Failed to connect to pageserver {}: {}",
|
||||
&pageserver_connstr, e
|
||||
),
|
||||
}
|
||||
|
||||
if let Some(period) = conf.recall_period {
|
||||
@@ -74,11 +79,15 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe
|
||||
}
|
||||
|
||||
impl<'pg> ReceiveWalConn<'pg> {
|
||||
pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> {
|
||||
pub fn new(
|
||||
pg: &'pg mut PostgresBackend,
|
||||
pageserver_connstr: Option<String>,
|
||||
) -> ReceiveWalConn<'pg> {
|
||||
let peer_addr = *pg.get_peer_addr();
|
||||
ReceiveWalConn {
|
||||
pg_backend: pg,
|
||||
peer_addr,
|
||||
pageserver_connstr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,17 +136,17 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
_ => bail!("unexpected message {:?} instead of greeting", msg),
|
||||
}
|
||||
|
||||
// if requested, ask pageserver to fetch wal from us
|
||||
// xxx: this place seems not really fitting
|
||||
if swh.conf.pageserver_addr.is_some() {
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
|
||||
let conf = swh.conf.clone();
|
||||
let timelineid = swh.timeline.get().timelineid;
|
||||
// copy to safely move to a thread
|
||||
let pageserver_connstr = pageserver_connstr.to_owned();
|
||||
let _ = thread::Builder::new()
|
||||
.name("request_callback thread".into())
|
||||
.spawn(move || {
|
||||
request_callback(conf, timelineid, tenant_id);
|
||||
request_callback(conf, pageserver_connstr, timelineid, tenant_id);
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
if let Some(app_name) = sm.params.get("application_name") {
|
||||
self.appname = Some(app_name.clone());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -75,7 +76,17 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
} else if query_string.starts_with(b"START_REPLICATION") {
|
||||
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
|
||||
} else if query_string.starts_with(b"START_WAL_PUSH") {
|
||||
ReceiveWalConn::new(pgb)
|
||||
// TODO: this repeats query decoding logic from page_service so it is probably
|
||||
// a good idea to refactor it in pgbackend and pass string to process query instead of bytes
|
||||
let decoded_query_string = match query_string.last() {
|
||||
Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?,
|
||||
_ => std::str::from_utf8(&query_string)?,
|
||||
};
|
||||
let pageserver_connstr = decoded_query_string
|
||||
.split_whitespace()
|
||||
.nth(1)
|
||||
.map(|s| s.to_owned());
|
||||
ReceiveWalConn::new(pgb, pageserver_connstr)
|
||||
.run(self)
|
||||
.with_context(|| "failed to run ReceiveWalConn")?;
|
||||
} else if query_string.starts_with(b"JSON_CTRL") {
|
||||
|
||||
Reference in New Issue
Block a user