diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index af919ea05f..09ff3894a5 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -199,17 +199,24 @@ impl PostgresNode { }) } - fn sync_safekeepers(&self) -> Result { + fn sync_safekeepers(&self, auth_token: &Option) -> Result { let pg_path = self.env.pg_bin_dir().join("postgres"); - let sync_handle = Command::new(pg_path) - .arg("--sync-safekeepers") + let mut cmd = Command::new(&pg_path); + + cmd.arg("--sync-safekeepers") .env_clear() .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("PGDATA", self.pgdata().to_str().unwrap()) .stdout(Stdio::piped()) // Comment this to avoid capturing stderr (useful if command hangs) - .stderr(Stdio::piped()) + .stderr(Stdio::piped()); + + if let Some(token) = auth_token { + cmd.env("ZENITH_AUTH_TOKEN", token); + } + + let sync_handle = cmd .spawn() .expect("postgres --sync-safekeepers failed to start"); @@ -319,8 +326,11 @@ impl PostgresNode { } else { "" }; - - format!("host={} port={} password={}", host, port, password) + // NOTE avoiding spaces in connection string, because it is less error prone if we forward it somewhere. + // Also note that not all parameters are supported here. Because in compute we substitute $ZENITH_AUTH_TOKEN + // We parse this string and build it back with token from env var, and for simplicity rebuild + // uses only needed variables namely host, port, user, password. + format!("postgresql://no_user:{}@{}:{}", password, host, port) }; conf.append("shared_preload_libraries", "zenith"); conf.append_line(""); @@ -358,7 +368,7 @@ impl PostgresNode { Ok(()) } - fn load_basebackup(&self) -> Result<()> { + fn load_basebackup(&self, auth_token: &Option) -> Result<()> { let backup_lsn = if let Some(lsn) = self.lsn { Some(lsn) } else if self.uses_wal_proposer { @@ -366,7 +376,7 @@ impl PostgresNode { // latest data from the pageserver. That is a bit clumsy but whole bootstrap // procedure evolves quite actively right now, so let's think about it again // when things would be more stable (TODO). - let lsn = self.sync_safekeepers()?; + let lsn = self.sync_safekeepers(auth_token)?; if lsn == Lsn(0) { None } else { @@ -417,7 +427,6 @@ impl PostgresNode { .env_clear() .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); - if let Some(token) = auth_token { cmd.env("ZENITH_AUTH_TOKEN", token); } @@ -451,7 +460,7 @@ impl PostgresNode { fs::write(&postgresql_conf_path, postgresql_conf)?; // 3. Load basebackup - self.load_basebackup()?; + self.load_basebackup(auth_token)?; if self.lsn.is_some() { File::create(self.pgdata().join("standby.signal"))?; diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index fcbf840397..07d81a8358 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -15,13 +15,11 @@ use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use zenith_utils::http::error::HttpErrorBody; -use zenith_utils::postgres_backend::AuthType; use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::read_pidfile; use crate::storage::PageServerNode; use zenith_utils::connstring::connection_address; -use zenith_utils::connstring::connection_host_port; #[derive(Error, Debug)] pub enum SafekeeperHttpError { @@ -116,17 +114,6 @@ impl SafekeeperNode { ); io::stdout().flush().unwrap(); - // Configure connection to page server - // - // FIXME: We extract the host and port from the connection string instead of using - // the connection string directly, because the 'safekeeper' binary expects - // host:port format. That's a bit silly when we already have a full libpq connection - // string at hand. - let pageserver_conn = { - let (host, port) = connection_host_port(&self.pageserver.pg_connection_config); - format!("{}:{}", host, port) - }; - let listen_pg = format!("localhost:{}", self.conf.pg_port); let listen_http = format!("localhost:{}", self.conf.http_port); @@ -134,7 +121,6 @@ impl SafekeeperNode { cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) - .args(&["--pageserver", &pageserver_conn]) .args(&["--recall", "1 second"]) .arg("--daemonize") .env_clear() @@ -143,10 +129,6 @@ impl SafekeeperNode { cmd.arg("--no-sync"); } - if self.env.pageserver.auth_type == AuthType::ZenithJWT { - cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token); - } - let var = "LLVM_PROFILE_FILE"; if let Some(val) = std::env::var_os(var) { cmd.env(var, val); diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 120e6c769b..e5351912fa 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -340,6 +340,7 @@ class ProposerPostgres: "synchronous_standby_names = 'walproposer'\n", f"zenith.zenith_timeline = '{self.timeline_id}'\n", f"zenith.zenith_tenant = '{self.tenant_id}'\n", + f"zenith.page_server_connstring = ''\n", f"wal_acceptors = '{wal_acceptors}'\n", ]) diff --git a/vendor/postgres b/vendor/postgres index be8bdba074..e0d50b91ef 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit be8bdba074baf2a4c7f8fb2cc701c2b3fac9342f +Subproject commit e0d50b91efb77a5e161745b2ee943fa9adff27b4 diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 0f06983574..0b6b14eb6d 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -44,6 +44,9 @@ fn main() -> Result<()> { .takes_value(true) .help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")), ) + // FIXME this argument is no longer needed since pageserver address is forwarded from compute. + // However because this argument is in use by console's e2e tests lets keep it for now and remove separately. + // So currently it is a noop. .arg( Arg::with_name("pageserver") .short("p") @@ -101,10 +104,6 @@ fn main() -> Result<()> { conf.listen_http_addr = addr.to_owned(); } - if let Some(addr) = arg_matches.value_of("pageserver") { - conf.pageserver_addr = Some(addr.to_owned()); - } - if let Some(ttl) = arg_matches.value_of("ttl") { conf.ttl = Some(humantime::parse_duration(ttl)?); } diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index bfb7c0cc82..a0060c3630 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -2,7 +2,6 @@ use std::path::PathBuf; use std::time::Duration; -use std::env; use zenith_utils::zid::ZTimelineId; pub mod http; @@ -40,9 +39,6 @@ pub struct SafeKeeperConf { pub no_sync: bool, pub listen_pg_addr: String, pub listen_http_addr: String, - pub pageserver_addr: Option, - // TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework - pub pageserver_auth_token: Option, pub ttl: Option, pub recall_period: Option, } @@ -62,12 +58,10 @@ impl Default for SafeKeeperConf { workdir: PathBuf::from("./"), daemonize: false, no_sync: false, - pageserver_addr: None, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), ttl: None, recall_period: None, - pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(), } } } diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index a653c41922..a7b9797770 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -28,20 +28,22 @@ pub struct ReceiveWalConn<'pg> { pg_backend: &'pg mut PostgresBackend, /// The cached result of `pg_backend.socket().peer_addr()` (roughly) peer_addr: SocketAddr, + /// Pageserver connection string forwarded from compute + /// NOTE that it is allowed to operate without a pageserver. + /// So if compute has no pageserver configured do not use it. + pageserver_connstr: Option, } /// /// Periodically request pageserver to call back. /// If pageserver already has replication channel, it will just ignore this request /// -fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTenantId) { - let ps_addr = conf.pageserver_addr.unwrap(); - let ps_connstr = format!( - "postgresql://no_user:{}@{}/no_db", - &conf.pageserver_auth_token.unwrap_or_default(), - ps_addr - ); - +fn request_callback( + conf: SafeKeeperConf, + pageserver_connstr: String, + timelineid: ZTimelineId, + tenantid: ZTenantId, +) { // use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr); let me_conf: Config = me_connstr.parse().unwrap(); @@ -54,15 +56,18 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe loop { info!( "requesting page server to connect to us: start {} {}", - ps_connstr, callme + pageserver_connstr, callme ); - match Client::connect(&ps_connstr, NoTls) { + match Client::connect(&pageserver_connstr, NoTls) { Ok(mut client) => { if let Err(e) = client.simple_query(&callme) { error!("Failed to send callme request to pageserver: {}", e); } } - Err(e) => error!("Failed to connect to pageserver {}: {}", &ps_connstr, e), + Err(e) => error!( + "Failed to connect to pageserver {}: {}", + &pageserver_connstr, e + ), } if let Some(period) = conf.recall_period { @@ -74,11 +79,15 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe } impl<'pg> ReceiveWalConn<'pg> { - pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> { + pub fn new( + pg: &'pg mut PostgresBackend, + pageserver_connstr: Option, + ) -> ReceiveWalConn<'pg> { let peer_addr = *pg.get_peer_addr(); ReceiveWalConn { pg_backend: pg, peer_addr, + pageserver_connstr, } } @@ -127,17 +136,17 @@ impl<'pg> ReceiveWalConn<'pg> { _ => bail!("unexpected message {:?} instead of greeting", msg), } - // if requested, ask pageserver to fetch wal from us - // xxx: this place seems not really fitting - if swh.conf.pageserver_addr.is_some() { - // Need to establish replication channel with page server. - // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + // Need to establish replication channel with page server. + // Add far as replication in postgres is initiated by receiver, we should use callme mechanism + if let Some(ref pageserver_connstr) = self.pageserver_connstr { let conf = swh.conf.clone(); let timelineid = swh.timeline.get().timelineid; + // copy to safely move to a thread + let pageserver_connstr = pageserver_connstr.to_owned(); let _ = thread::Builder::new() .name("request_callback thread".into()) .spawn(move || { - request_callback(conf, timelineid, tenant_id); + request_callback(conf, pageserver_connstr, timelineid, tenant_id); }) .unwrap(); } diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index a97fed052b..a86d8d9305 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -46,6 +46,7 @@ impl postgres_backend::Handler for SendWalHandler { if let Some(app_name) = sm.params.get("application_name") { self.appname = Some(app_name.clone()); } + Ok(()) } @@ -75,7 +76,17 @@ impl postgres_backend::Handler for SendWalHandler { } else if query_string.starts_with(b"START_REPLICATION") { ReplicationConn::new(pgb).run(self, pgb, &query_string)?; } else if query_string.starts_with(b"START_WAL_PUSH") { - ReceiveWalConn::new(pgb) + // TODO: this repeats query decoding logic from page_service so it is probably + // a good idea to refactor it in pgbackend and pass string to process query instead of bytes + let decoded_query_string = match query_string.last() { + Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?, + _ => std::str::from_utf8(&query_string)?, + }; + let pageserver_connstr = decoded_query_string + .split_whitespace() + .nth(1) + .map(|s| s.to_owned()); + ReceiveWalConn::new(pgb, pageserver_connstr) .run(self) .with_context(|| "failed to run ReceiveWalConn")?; } else if query_string.starts_with(b"JSON_CTRL") {