From 557e3024cd2059f37289b2cfb4f7ed82e022d658 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Fri, 26 Nov 2021 15:09:10 +0300 Subject: [PATCH] Forward pageserver connection string from compute to safekeeper This is needed for implementation of tenant rebalancing. With this change safekeeper becomes aware of which pageserver is supposed to be used for replication from this particular compute. --- control_plane/src/compute.rs | 29 +++++++----- control_plane/src/safekeeper.rs | 18 -------- test_runner/batch_others/test_wal_acceptor.py | 1 + vendor/postgres | 2 +- walkeeper/src/bin/safekeeper.rs | 7 ++- walkeeper/src/lib.rs | 6 --- walkeeper/src/receive_wal.rs | 45 +++++++++++-------- walkeeper/src/send_wal.rs | 13 +++++- 8 files changed, 63 insertions(+), 58 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index af919ea05f..09ff3894a5 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -199,17 +199,24 @@ impl PostgresNode { }) } - fn sync_safekeepers(&self) -> Result { + fn sync_safekeepers(&self, auth_token: &Option) -> Result { let pg_path = self.env.pg_bin_dir().join("postgres"); - let sync_handle = Command::new(pg_path) - .arg("--sync-safekeepers") + let mut cmd = Command::new(&pg_path); + + cmd.arg("--sync-safekeepers") .env_clear() .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("PGDATA", self.pgdata().to_str().unwrap()) .stdout(Stdio::piped()) // Comment this to avoid capturing stderr (useful if command hangs) - .stderr(Stdio::piped()) + .stderr(Stdio::piped()); + + if let Some(token) = auth_token { + cmd.env("ZENITH_AUTH_TOKEN", token); + } + + let sync_handle = cmd .spawn() .expect("postgres --sync-safekeepers failed to start"); @@ -319,8 +326,11 @@ impl PostgresNode { } else { "" }; - - format!("host={} port={} password={}", host, port, password) + // NOTE avoiding spaces in connection string, because it is less error prone if we forward it somewhere. + // Also note that not all parameters are supported here. Because in compute we substitute $ZENITH_AUTH_TOKEN + // We parse this string and build it back with token from env var, and for simplicity rebuild + // uses only needed variables namely host, port, user, password. + format!("postgresql://no_user:{}@{}:{}", password, host, port) }; conf.append("shared_preload_libraries", "zenith"); conf.append_line(""); @@ -358,7 +368,7 @@ impl PostgresNode { Ok(()) } - fn load_basebackup(&self) -> Result<()> { + fn load_basebackup(&self, auth_token: &Option) -> Result<()> { let backup_lsn = if let Some(lsn) = self.lsn { Some(lsn) } else if self.uses_wal_proposer { @@ -366,7 +376,7 @@ impl PostgresNode { // latest data from the pageserver. That is a bit clumsy but whole bootstrap // procedure evolves quite actively right now, so let's think about it again // when things would be more stable (TODO). - let lsn = self.sync_safekeepers()?; + let lsn = self.sync_safekeepers(auth_token)?; if lsn == Lsn(0) { None } else { @@ -417,7 +427,6 @@ impl PostgresNode { .env_clear() .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); - if let Some(token) = auth_token { cmd.env("ZENITH_AUTH_TOKEN", token); } @@ -451,7 +460,7 @@ impl PostgresNode { fs::write(&postgresql_conf_path, postgresql_conf)?; // 3. Load basebackup - self.load_basebackup()?; + self.load_basebackup(auth_token)?; if self.lsn.is_some() { File::create(self.pgdata().join("standby.signal"))?; diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index fcbf840397..07d81a8358 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -15,13 +15,11 @@ use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use zenith_utils::http::error::HttpErrorBody; -use zenith_utils::postgres_backend::AuthType; use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::read_pidfile; use crate::storage::PageServerNode; use zenith_utils::connstring::connection_address; -use zenith_utils::connstring::connection_host_port; #[derive(Error, Debug)] pub enum SafekeeperHttpError { @@ -116,17 +114,6 @@ impl SafekeeperNode { ); io::stdout().flush().unwrap(); - // Configure connection to page server - // - // FIXME: We extract the host and port from the connection string instead of using - // the connection string directly, because the 'safekeeper' binary expects - // host:port format. That's a bit silly when we already have a full libpq connection - // string at hand. - let pageserver_conn = { - let (host, port) = connection_host_port(&self.pageserver.pg_connection_config); - format!("{}:{}", host, port) - }; - let listen_pg = format!("localhost:{}", self.conf.pg_port); let listen_http = format!("localhost:{}", self.conf.http_port); @@ -134,7 +121,6 @@ impl SafekeeperNode { cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) - .args(&["--pageserver", &pageserver_conn]) .args(&["--recall", "1 second"]) .arg("--daemonize") .env_clear() @@ -143,10 +129,6 @@ impl SafekeeperNode { cmd.arg("--no-sync"); } - if self.env.pageserver.auth_type == AuthType::ZenithJWT { - cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token); - } - let var = "LLVM_PROFILE_FILE"; if let Some(val) = std::env::var_os(var) { cmd.env(var, val); diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 120e6c769b..e5351912fa 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -340,6 +340,7 @@ class ProposerPostgres: "synchronous_standby_names = 'walproposer'\n", f"zenith.zenith_timeline = '{self.timeline_id}'\n", f"zenith.zenith_tenant = '{self.tenant_id}'\n", + f"zenith.page_server_connstring = ''\n", f"wal_acceptors = '{wal_acceptors}'\n", ]) diff --git a/vendor/postgres b/vendor/postgres index be8bdba074..e0d50b91ef 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit be8bdba074baf2a4c7f8fb2cc701c2b3fac9342f +Subproject commit e0d50b91efb77a5e161745b2ee943fa9adff27b4 diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 0f06983574..0b6b14eb6d 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -44,6 +44,9 @@ fn main() -> Result<()> { .takes_value(true) .help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")), ) + // FIXME this argument is no longer needed since pageserver address is forwarded from compute. + // However because this argument is in use by console's e2e tests lets keep it for now and remove separately. + // So currently it is a noop. .arg( Arg::with_name("pageserver") .short("p") @@ -101,10 +104,6 @@ fn main() -> Result<()> { conf.listen_http_addr = addr.to_owned(); } - if let Some(addr) = arg_matches.value_of("pageserver") { - conf.pageserver_addr = Some(addr.to_owned()); - } - if let Some(ttl) = arg_matches.value_of("ttl") { conf.ttl = Some(humantime::parse_duration(ttl)?); } diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index bfb7c0cc82..a0060c3630 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -2,7 +2,6 @@ use std::path::PathBuf; use std::time::Duration; -use std::env; use zenith_utils::zid::ZTimelineId; pub mod http; @@ -40,9 +39,6 @@ pub struct SafeKeeperConf { pub no_sync: bool, pub listen_pg_addr: String, pub listen_http_addr: String, - pub pageserver_addr: Option, - // TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework - pub pageserver_auth_token: Option, pub ttl: Option, pub recall_period: Option, } @@ -62,12 +58,10 @@ impl Default for SafeKeeperConf { workdir: PathBuf::from("./"), daemonize: false, no_sync: false, - pageserver_addr: None, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), ttl: None, recall_period: None, - pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(), } } } diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index a653c41922..a7b9797770 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -28,20 +28,22 @@ pub struct ReceiveWalConn<'pg> { pg_backend: &'pg mut PostgresBackend, /// The cached result of `pg_backend.socket().peer_addr()` (roughly) peer_addr: SocketAddr, + /// Pageserver connection string forwarded from compute + /// NOTE that it is allowed to operate without a pageserver. + /// So if compute has no pageserver configured do not use it. + pageserver_connstr: Option, } /// /// Periodically request pageserver to call back. /// If pageserver already has replication channel, it will just ignore this request /// -fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTenantId) { - let ps_addr = conf.pageserver_addr.unwrap(); - let ps_connstr = format!( - "postgresql://no_user:{}@{}/no_db", - &conf.pageserver_auth_token.unwrap_or_default(), - ps_addr - ); - +fn request_callback( + conf: SafeKeeperConf, + pageserver_connstr: String, + timelineid: ZTimelineId, + tenantid: ZTenantId, +) { // use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr); let me_conf: Config = me_connstr.parse().unwrap(); @@ -54,15 +56,18 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe loop { info!( "requesting page server to connect to us: start {} {}", - ps_connstr, callme + pageserver_connstr, callme ); - match Client::connect(&ps_connstr, NoTls) { + match Client::connect(&pageserver_connstr, NoTls) { Ok(mut client) => { if let Err(e) = client.simple_query(&callme) { error!("Failed to send callme request to pageserver: {}", e); } } - Err(e) => error!("Failed to connect to pageserver {}: {}", &ps_connstr, e), + Err(e) => error!( + "Failed to connect to pageserver {}: {}", + &pageserver_connstr, e + ), } if let Some(period) = conf.recall_period { @@ -74,11 +79,15 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe } impl<'pg> ReceiveWalConn<'pg> { - pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> { + pub fn new( + pg: &'pg mut PostgresBackend, + pageserver_connstr: Option, + ) -> ReceiveWalConn<'pg> { let peer_addr = *pg.get_peer_addr(); ReceiveWalConn { pg_backend: pg, peer_addr, + pageserver_connstr, } } @@ -127,17 +136,17 @@ impl<'pg> ReceiveWalConn<'pg> { _ => bail!("unexpected message {:?} instead of greeting", msg), } - // if requested, ask pageserver to fetch wal from us - // xxx: this place seems not really fitting - if swh.conf.pageserver_addr.is_some() { - // Need to establish replication channel with page server. - // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + // Need to establish replication channel with page server. + // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + if let Some(ref pageserver_connstr) = self.pageserver_connstr { let conf = swh.conf.clone(); let timelineid = swh.timeline.get().timelineid; + // copy to safely move to a thread + let pageserver_connstr = pageserver_connstr.to_owned(); let _ = thread::Builder::new() .name("request_callback thread".into()) .spawn(move || { - request_callback(conf, timelineid, tenant_id); + request_callback(conf, pageserver_connstr, timelineid, tenant_id); }) .unwrap(); } diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index a97fed052b..a86d8d9305 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -46,6 +46,7 @@ impl postgres_backend::Handler for SendWalHandler { if let Some(app_name) = sm.params.get("application_name") { self.appname = Some(app_name.clone()); } + Ok(()) } @@ -75,7 +76,17 @@ impl postgres_backend::Handler for SendWalHandler { } else if query_string.starts_with(b"START_REPLICATION") { ReplicationConn::new(pgb).run(self, pgb, &query_string)?; } else if query_string.starts_with(b"START_WAL_PUSH") { - ReceiveWalConn::new(pgb) + // TODO: this repeats query decoding logic from page_service so it is probably + // a good idea to refactor it in pgbackend and pass string to process query instead of bytes + let decoded_query_string = match query_string.last() { + Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?, + _ => std::str::from_utf8(&query_string)?, + }; + let pageserver_connstr = decoded_query_string + .split_whitespace() + .nth(1) + .map(|s| s.to_owned()); + ReceiveWalConn::new(pgb, pageserver_connstr) .run(self) .with_context(|| "failed to run ReceiveWalConn")?; } else if query_string.starts_with(b"JSON_CTRL") {