Proxy: wait for kick on .pgpass connection (zenithdb/console#227)

This commit is contained in:
Alexey Kondratov
2021-10-22 18:21:35 +03:00
committed by Alexey Kondratov
parent 1c0e85f9a0
commit d423142623
2 changed files with 69 additions and 27 deletions

View File

@@ -15,6 +15,13 @@ pub struct DatabaseInfo {
pub password: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ProxyAuthResult {
pub ready: bool,
pub error: Option<String>,
pub conn_info: Option<DatabaseInfo>,
}
impl DatabaseInfo {
pub fn socket_addr(&self) -> Result<SocketAddr> {
let host_port = format!("{}:{}", self.host, self.port);
@@ -55,22 +62,25 @@ impl CPlaneApi {
database: &str,
md5_response: &[u8],
salt: &[u8; 4],
) -> Result<DatabaseInfo> {
psql_session_id: &str,
) -> Result<ProxyAuthResult> {
let mut url = reqwest::Url::parse(self.auth_endpoint)?;
url.query_pairs_mut()
.append_pair("login", user)
.append_pair("database", database)
.append_pair("md5response", std::str::from_utf8(md5_response)?)
.append_pair("salt", &hex::encode(salt));
.append_pair("salt", &hex::encode(salt))
.append_pair("psql_session_id", psql_session_id);
println!("cplane request: {}", url.as_str());
let resp = reqwest::blocking::get(url)?;
if resp.status().is_success() {
let conn_info: DatabaseInfo = serde_json::from_str(resp.text()?.as_str())?;
println!("got conn info: #{:?}", conn_info);
Ok(conn_info)
let auth_info: ProxyAuthResult = serde_json::from_str(resp.text()?.as_str())?;
println!("got auth info: #{:?}", auth_info);
Ok(auth_info)
} else {
bail!("Auth failed")
}

View File

@@ -75,8 +75,12 @@ pub fn proxy_conn_main(
// This will set conn.existing_user and we can decide on next actions
conn.handle_startup()?;
let mut psql_session_id_buf = [0u8; 8];
rand::thread_rng().fill(&mut psql_session_id_buf);
conn.psql_session_id = hex::encode(psql_session_id_buf);
// both scenarious here should end up producing database connection string
let db_info = if conn.is_existing_user() {
let conn_info = if conn.is_existing_user() {
conn.handle_existing_user()?
} else {
conn.handle_new_user()?
@@ -84,7 +88,7 @@ pub fn proxy_conn_main(
// XXX: move that inside handle_new_user/handle_existing_user to be able to
// report wrong connection error.
proxy_pass(conn.pgb, db_info)
proxy_pass(conn.pgb, conn_info)
}
impl ProxyConnection {
@@ -156,6 +160,21 @@ impl ProxyConnection {
Ok(())
}
// Wait for proxy kick form the console with conninfo
fn wait_for_conninfo(&mut self) -> anyhow::Result<DatabaseInfo> {
let (tx, rx) = channel::<anyhow::Result<DatabaseInfo>>();
let _ = self
.state
.waiters
.lock()
.unwrap()
.insert(self.psql_session_id.clone(), tx);
// Wait for web console response
// TODO: respond with error to client
rx.recv()?
}
fn handle_existing_user(&mut self) -> anyhow::Result<DatabaseInfo> {
// ask password
rand::thread_rng().fill(&mut self.md5_salt);
@@ -182,14 +201,41 @@ impl ProxyConnection {
self.database.as_str(),
md5_response,
&self.md5_salt,
&self.psql_session_id,
) {
Err(e) => {
self.pgb
.write_message(&BeMessage::ErrorResponse(format!("{}", e)))?;
self.pgb.write_message(&BeMessage::ErrorResponse(format!(
"cannot authenticate proxy: {}",
e
)))?;
bail!("auth failed: {}", e);
}
Ok(conn_info) => {
Ok(auth_info) => {
let conn_info = if auth_info.ready {
// Cluster is ready, so just take `conn_info` and respond to the client.
auth_info
.conn_info
.expect("conn_info should be provided with ready cluster")
} else {
match auth_info.error {
Some(e) => {
self.pgb.write_message(&BeMessage::ErrorResponse(format!(
"cannot authenticate proxy: {}",
e
)))?;
bail!("auth failed: {}", e);
}
None => {
// Cluster exists, but isn't active, await its start and proxy kick
// with `conn_info`.
self.wait_for_conninfo()?
}
}
};
self.pgb
.write_message_noflush(&BeMessage::AuthenticationOk)?;
self.pgb
@@ -205,10 +251,6 @@ impl ProxyConnection {
}
fn handle_new_user(&mut self) -> anyhow::Result<DatabaseInfo> {
let mut psql_session_id_buf = [0u8; 8];
rand::thread_rng().fill(&mut psql_session_id_buf);
self.psql_session_id = hex::encode(psql_session_id_buf);
let hello_message = format!("☀️ Welcome to Zenith!
To proceed with database creation, open the following link:
@@ -227,25 +269,15 @@ databases without opening the browser.
self.pgb
.write_message(&BeMessage::NoticeResponse(hello_message))?;
// await for database creation
let (tx, rx) = channel::<anyhow::Result<DatabaseInfo>>();
let _ = self
.state
.waiters
.lock()
.unwrap()
.insert(self.psql_session_id.clone(), tx);
// Wait for web console response
// XXX: respond with error to client
let dbinfo = rx.recv()??;
// We requested the DB creation from the console. Now wait for conninfo
let conn_info = self.wait_for_conninfo()?;
self.pgb.write_message_noflush(&BeMessage::NoticeResponse(
"Connecting to database.".to_string(),
))?;
self.pgb.write_message(&BeMessage::ReadyForQuery)?;
Ok(dbinfo)
Ok(conn_info)
}
}