From d423142623890cfaee2b4f66df459f5b6d65c760 Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Fri, 22 Oct 2021 18:21:35 +0300 Subject: [PATCH] Proxy: wait for kick on .pgpass connection (zenithdb/console#227) --- proxy/src/cplane_api.rs | 20 ++++++++--- proxy/src/proxy.rs | 76 +++++++++++++++++++++++++++++------------ 2 files changed, 69 insertions(+), 27 deletions(-) diff --git a/proxy/src/cplane_api.rs b/proxy/src/cplane_api.rs index 3435dca7b2..2579b9a73d 100644 --- a/proxy/src/cplane_api.rs +++ b/proxy/src/cplane_api.rs @@ -15,6 +15,13 @@ pub struct DatabaseInfo { pub password: Option, } +#[derive(Serialize, Deserialize, Debug)] +pub struct ProxyAuthResult { + pub ready: bool, + pub error: Option, + pub conn_info: Option, +} + impl DatabaseInfo { pub fn socket_addr(&self) -> Result { let host_port = format!("{}:{}", self.host, self.port); @@ -55,22 +62,25 @@ impl CPlaneApi { database: &str, md5_response: &[u8], salt: &[u8; 4], - ) -> Result { + psql_session_id: &str, + ) -> Result { let mut url = reqwest::Url::parse(self.auth_endpoint)?; url.query_pairs_mut() .append_pair("login", user) .append_pair("database", database) .append_pair("md5response", std::str::from_utf8(md5_response)?) - .append_pair("salt", &hex::encode(salt)); + .append_pair("salt", &hex::encode(salt)) + .append_pair("psql_session_id", psql_session_id); println!("cplane request: {}", url.as_str()); let resp = reqwest::blocking::get(url)?; if resp.status().is_success() { - let conn_info: DatabaseInfo = serde_json::from_str(resp.text()?.as_str())?; - println!("got conn info: #{:?}", conn_info); - Ok(conn_info) + let auth_info: ProxyAuthResult = serde_json::from_str(resp.text()?.as_str())?; + println!("got auth info: #{:?}", auth_info); + + Ok(auth_info) } else { bail!("Auth failed") } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 1debabae9c..2b15c4e55f 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -75,8 +75,12 @@ pub fn proxy_conn_main( // This will set conn.existing_user and we can decide on next actions conn.handle_startup()?; + let mut psql_session_id_buf = [0u8; 8]; + rand::thread_rng().fill(&mut psql_session_id_buf); + conn.psql_session_id = hex::encode(psql_session_id_buf); + // both scenarious here should end up producing database connection string - let db_info = if conn.is_existing_user() { + let conn_info = if conn.is_existing_user() { conn.handle_existing_user()? } else { conn.handle_new_user()? @@ -84,7 +88,7 @@ pub fn proxy_conn_main( // XXX: move that inside handle_new_user/handle_existing_user to be able to // report wrong connection error. - proxy_pass(conn.pgb, db_info) + proxy_pass(conn.pgb, conn_info) } impl ProxyConnection { @@ -156,6 +160,21 @@ impl ProxyConnection { Ok(()) } + // Wait for proxy kick form the console with conninfo + fn wait_for_conninfo(&mut self) -> anyhow::Result { + let (tx, rx) = channel::>(); + let _ = self + .state + .waiters + .lock() + .unwrap() + .insert(self.psql_session_id.clone(), tx); + + // Wait for web console response + // TODO: respond with error to client + rx.recv()? + } + fn handle_existing_user(&mut self) -> anyhow::Result { // ask password rand::thread_rng().fill(&mut self.md5_salt); @@ -182,14 +201,41 @@ impl ProxyConnection { self.database.as_str(), md5_response, &self.md5_salt, + &self.psql_session_id, ) { Err(e) => { - self.pgb - .write_message(&BeMessage::ErrorResponse(format!("{}", e)))?; + self.pgb.write_message(&BeMessage::ErrorResponse(format!( + "cannot authenticate proxy: {}", + e + )))?; bail!("auth failed: {}", e); } - Ok(conn_info) => { + + Ok(auth_info) => { + let conn_info = if auth_info.ready { + // Cluster is ready, so just take `conn_info` and respond to the client. + auth_info + .conn_info + .expect("conn_info should be provided with ready cluster") + } else { + match auth_info.error { + Some(e) => { + self.pgb.write_message(&BeMessage::ErrorResponse(format!( + "cannot authenticate proxy: {}", + e + )))?; + + bail!("auth failed: {}", e); + } + None => { + // Cluster exists, but isn't active, await its start and proxy kick + // with `conn_info`. + self.wait_for_conninfo()? + } + } + }; + self.pgb .write_message_noflush(&BeMessage::AuthenticationOk)?; self.pgb @@ -205,10 +251,6 @@ impl ProxyConnection { } fn handle_new_user(&mut self) -> anyhow::Result { - let mut psql_session_id_buf = [0u8; 8]; - rand::thread_rng().fill(&mut psql_session_id_buf); - self.psql_session_id = hex::encode(psql_session_id_buf); - let hello_message = format!("☀️ Welcome to Zenith! To proceed with database creation, open the following link: @@ -227,25 +269,15 @@ databases without opening the browser. self.pgb .write_message(&BeMessage::NoticeResponse(hello_message))?; - // await for database creation - let (tx, rx) = channel::>(); - let _ = self - .state - .waiters - .lock() - .unwrap() - .insert(self.psql_session_id.clone(), tx); - - // Wait for web console response - // XXX: respond with error to client - let dbinfo = rx.recv()??; + // We requested the DB creation from the console. Now wait for conninfo + let conn_info = self.wait_for_conninfo()?; self.pgb.write_message_noflush(&BeMessage::NoticeResponse( "Connecting to database.".to_string(), ))?; self.pgb.write_message(&BeMessage::ReadyForQuery)?; - Ok(dbinfo) + Ok(conn_info) } }