Files
neon/proxy/src/mgmt.rs
2022-11-03 22:56:04 +03:00

128 lines
3.5 KiB
Rust

use crate::auth;
use anyhow::Context;
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use serde::Deserialize;
use std::{
net::{TcpListener, TcpStream},
thread,
};
use tracing::{error, info};
use utils::postgres_backend::{self, AuthType, PostgresBackend};
/// TODO: move all of that to auth-backend/link.rs when we ditch legacy-console backend
///
/// Main proxy listener loop.
///
/// Listens for connections, and launches a new handler thread for each.
///
pub fn thread_main(listener: TcpListener) -> anyhow::Result<()> {
scopeguard::defer! {
info!("mgmt has shut down");
}
listener
.set_nonblocking(false)
.context("failed to set listener to blocking")?;
loop {
let (socket, peer_addr) = listener.accept().context("failed to accept a new client")?;
info!("accepted connection from {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set client socket option")?;
thread::spawn(move || {
if let Err(err) = handle_connection(socket) {
error!("{err}");
}
});
}
}
fn handle_connection(socket: TcpStream) -> anyhow::Result<()> {
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, true)?;
pgbackend.run(&mut MgmtHandler)
}
struct MgmtHandler;
/// Serialized examples:
// {
// "session_id": "71d6d03e6d93d99a",
// "result": {
// "Success": {
// "host": "127.0.0.1",
// "port": 5432,
// "dbname": "stas",
// "user": "stas",
// "password": "mypass"
// }
// }
// }
// {
// "session_id": "71d6d03e6d93d99a",
// "result": {
// "Failure": "oops"
// }
// }
//
// // to test manually by sending a query to mgmt interface:
// psql -h 127.0.0.1 -p 9999 -c '{"session_id":"4f10dde522e14739","result":{"Success":{"host":"127.0.0.1","port":5432,"dbname":"stas","user":"stas","password":"stas"}}}'
#[derive(Deserialize)]
struct PsqlSessionResponse {
session_id: String,
result: PsqlSessionResult,
}
#[derive(Deserialize)]
enum PsqlSessionResult {
Success(auth::DatabaseInfo),
Failure(String),
}
/// A message received by `mgmt` when a compute node is ready.
pub type ComputeReady = Result<auth::DatabaseInfo, String>;
impl PsqlSessionResult {
fn into_compute_ready(self) -> ComputeReady {
match self {
Self::Success(db_info) => Ok(db_info),
Self::Failure(message) => Err(message),
}
}
}
impl postgres_backend::Handler for MgmtHandler {
fn process_query(
&mut self,
pgb: &mut PostgresBackend,
query_string: &str,
) -> anyhow::Result<()> {
let res = try_process_query(pgb, query_string);
// intercept and log error message
if res.is_err() {
error!("mgmt query failed: {res:?}");
}
res
}
}
fn try_process_query(pgb: &mut PostgresBackend, query_string: &str) -> anyhow::Result<()> {
info!("got mgmt query [redacted]"); // Content contains password, don't print it
let resp: PsqlSessionResponse = serde_json::from_str(query_string)?;
match auth::backend::notify(&resp.session_id, resp.result.into_compute_ready()) {
Ok(()) => {
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::DataRow(&[Some(b"ok")]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
Err(e) => {
pgb.write_message(&BeMessage::ErrorResponse(&e.to_string()))?;
}
}
Ok(())
}