From 8331ce865c69a72bae83b5d772248bd1a2a0ac94 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Mon, 27 Sep 2021 10:44:27 +0300 Subject: [PATCH] Interceipt and log error in mgmt interface. That PostgresBackend is better be replaced with the http server or redis subscription. For now let's improve logging and move on. --- proxy/src/mgmt.rs | 58 ++++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/proxy/src/mgmt.rs b/proxy/src/mgmt.rs index 90fccd770d..2b3259f8ec 100644 --- a/proxy/src/mgmt.rs +++ b/proxy/src/mgmt.rs @@ -49,7 +49,7 @@ struct MgmtHandler { // "host": "127.0.0.1", // "port": 5432, // "dbname": "stas", -// "user": "stas" +// "user": "stas", // "password": "mypass" // } // } @@ -60,6 +60,9 @@ struct MgmtHandler { // "Failure": "oops" // } // } +// +// // to test manually by sending a query to mgmt interface: +// psql -h 127.0.0.1 -p 9999 -c '{"session_id":"4f10dde522e14739","result":{"Success":{"host":"127.0.0.1","port":5432,"dbname":"stas","user":"stas","password":"stas"}}}' #[derive(Deserialize)] pub struct PsqlSessionResponse { session_id: String, @@ -78,34 +81,47 @@ impl postgres_backend::Handler for MgmtHandler { pgb: &mut PostgresBackend, query_string: Bytes, ) -> anyhow::Result<()> { - let query_string = query_from_cstring(query_string); + let res = try_process_query(self, pgb, query_string); + // intercept and log error message + if res.is_err() { + println!("Mgmt query failed: #{:?}", res); + } + res + } +} - println!("Got mgmt query: '{}'", std::str::from_utf8(&query_string)?); +fn try_process_query( + mgmt: &mut MgmtHandler, + pgb: &mut PostgresBackend, + query_string: Bytes, +) -> anyhow::Result<()> { + let query_string = query_from_cstring(query_string); - let resp: PsqlSessionResponse = serde_json::from_slice(&query_string)?; + println!("Got mgmt query: '{}'", std::str::from_utf8(&query_string)?); - let waiters = self.state.waiters.lock().unwrap(); + let resp: PsqlSessionResponse = serde_json::from_slice(&query_string)?; - let sender = waiters - .get(&resp.session_id) - .ok_or_else(|| anyhow::Error::msg("psql_session_id is not found"))?; + let waiters = mgmt.state.waiters.lock().unwrap(); - match resp.result { - PsqlSessionResult::Success(db_info) => { - sender.send(Ok(db_info))?; + let sender = waiters + .get(&resp.session_id) + .ok_or_else(|| anyhow::Error::msg("psql_session_id is not found"))?; - pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? - .write_message_noflush(&BeMessage::DataRow(&[Some(b"ok")]))? - .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - pgb.flush()?; - Ok(()) - } + match resp.result { + PsqlSessionResult::Success(db_info) => { + sender.send(Ok(db_info))?; - PsqlSessionResult::Failure(message) => { - sender.send(Err(anyhow::Error::msg(message.clone())))?; + pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? + .write_message_noflush(&BeMessage::DataRow(&[Some(b"ok")]))? + .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + pgb.flush()?; + Ok(()) + } - bail!("psql session request failed: {}", message) - } + PsqlSessionResult::Failure(message) => { + sender.send(Err(anyhow::Error::msg(message.clone())))?; + + bail!("psql session request failed: {}", message) } } }