mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Implement mock console
This commit is contained in:
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -86,9 +86,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.51"
|
||||
version = "0.1.52"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
|
||||
checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1564,6 +1564,8 @@ name = "proxy"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"base64 0.13.0",
|
||||
"bytes",
|
||||
"clap",
|
||||
"futures",
|
||||
|
||||
@@ -31,6 +31,8 @@ scopeguard = "1.1.0"
|
||||
|
||||
zenith_utils = { path = "../zenith_utils" }
|
||||
zenith_metrics = { path = "../zenith_metrics" }
|
||||
base64 = "0.13.0"
|
||||
async-trait = "0.1.52"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-postgres-rustls = "0.8.0"
|
||||
|
||||
41
proxy/src/auth.rs
Normal file
41
proxy/src/auth.rs
Normal file
@@ -0,0 +1,41 @@
|
||||
use crate::db::AuthSecret;
|
||||
use crate::stream::PqStream;
|
||||
use bytes::Bytes;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use zenith_utils::pq_proto::BeMessage as Be;
|
||||
|
||||
|
||||
/// Stored secret for authenticating the user via md5 but authenticating
|
||||
/// to the compute database with a (possibly different) plaintext password.
|
||||
pub struct PlaintextStoredSecret {
|
||||
pub salt: [u8; 4],
|
||||
pub hashed_salted_password: Bytes,
|
||||
pub compute_db_password: String,
|
||||
}
|
||||
|
||||
/// Sufficient information to auth user and create AuthSecret
|
||||
#[non_exhaustive]
|
||||
pub enum StoredSecret {
|
||||
PlaintextPassword(PlaintextStoredSecret),
|
||||
// TODO add md5 option?
|
||||
// TODO add SCRAM option
|
||||
}
|
||||
|
||||
pub async fn authenticate(
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
stored_secret: StoredSecret
|
||||
) -> anyhow::Result<AuthSecret> {
|
||||
match stored_secret {
|
||||
StoredSecret::PlaintextPassword(stored) => {
|
||||
client.write_message(&Be::AuthenticationMD5Password(&stored.salt)).await?;
|
||||
let provided = client.read_password_message().await?;
|
||||
anyhow::ensure!(provided == stored.hashed_salted_password);
|
||||
Ok(AuthSecret::Password(stored.compute_db_password))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait SecretStore {
|
||||
async fn get_stored_secret(&self, creds: &crate::cplane_api::ClientCredentials) -> anyhow::Result<StoredSecret>;
|
||||
}
|
||||
7
proxy/src/compute.rs
Normal file
7
proxy/src/compute.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
use crate::{cplane_api::ClientCredentials, db::DatabaseConnInfo};
|
||||
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait ComputeProvider {
|
||||
async fn get_compute_node(&self, creds: &ClientCredentials) -> anyhow::Result<DatabaseConnInfo>;
|
||||
}
|
||||
@@ -45,35 +45,6 @@ enum ProxyAuthResponse {
|
||||
NotReady { ready: bool }, // TODO: get rid of `ready`
|
||||
}
|
||||
|
||||
impl DatabaseInfo {
|
||||
pub fn socket_addr(&self) -> anyhow::Result<SocketAddr> {
|
||||
let host_port = format!("{}:{}", self.host, self.port);
|
||||
host_port
|
||||
.to_socket_addrs()
|
||||
.with_context(|| format!("cannot resolve {} to SocketAddr", host_port))?
|
||||
.next()
|
||||
.context("cannot resolve at least one SocketAddr")
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DatabaseInfo> for tokio_postgres::Config {
|
||||
fn from(db_info: DatabaseInfo) -> Self {
|
||||
let mut config = tokio_postgres::Config::new();
|
||||
|
||||
config
|
||||
.host(&db_info.host)
|
||||
.port(db_info.port)
|
||||
.dbname(&db_info.dbname)
|
||||
.user(&db_info.user);
|
||||
|
||||
if let Some(password) = db_info.password {
|
||||
config.password(password);
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CPlaneApi<'a> {
|
||||
auth_endpoint: &'a str,
|
||||
waiters: &'a ProxyWaiters,
|
||||
|
||||
58
proxy/src/db.rs
Normal file
58
proxy/src/db.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
///
|
||||
/// Utils for connecting with the postgres dataabase.
|
||||
///
|
||||
|
||||
use std::net::{SocketAddr, ToSocketAddrs};
|
||||
use anyhow::{Context, anyhow};
|
||||
|
||||
use crate::cplane_api::ClientCredentials;
|
||||
|
||||
pub struct DatabaseConnInfo {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
pub struct DatabaseAuthInfo {
|
||||
pub conn_info: DatabaseConnInfo,
|
||||
pub creds: ClientCredentials,
|
||||
pub auth_secret: AuthSecret,
|
||||
}
|
||||
|
||||
/// Sufficient information to auth with database
|
||||
#[non_exhaustive]
|
||||
#[derive(Debug)]
|
||||
pub enum AuthSecret {
|
||||
Password(String),
|
||||
// TODO add SCRAM option
|
||||
}
|
||||
|
||||
impl From<DatabaseAuthInfo> for tokio_postgres::Config {
|
||||
fn from(auth_info: DatabaseAuthInfo) -> Self {
|
||||
let mut config = tokio_postgres::Config::new();
|
||||
|
||||
config
|
||||
.host(&auth_info.conn_info.host)
|
||||
.port(auth_info.conn_info.port)
|
||||
.dbname(&auth_info.creds.dbname)
|
||||
.user(&auth_info.creds.user);
|
||||
|
||||
match auth_info.auth_secret {
|
||||
AuthSecret::Password(password) => {
|
||||
config.password(password);
|
||||
}
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseConnInfo {
|
||||
pub fn socket_addr(&self) -> anyhow::Result<SocketAddr> {
|
||||
let host_port = format!("{}:{}", self.host, self.port);
|
||||
host_port
|
||||
.to_socket_addrs()
|
||||
.with_context(|| format!("cannot resolve {} to SocketAddr", host_port))?
|
||||
.next()
|
||||
.ok_or_else(|| anyhow!("cannot resolve at least one SocketAddr"))
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,10 @@ use clap::{App, Arg};
|
||||
use state::{ProxyConfig, ProxyState};
|
||||
use zenith_utils::{tcp_listener, GIT_VERSION};
|
||||
|
||||
mod compute;
|
||||
mod mock;
|
||||
mod auth;
|
||||
mod db;
|
||||
mod cancellation;
|
||||
mod cplane_api;
|
||||
mod http;
|
||||
|
||||
32
proxy/src/mock.rs
Normal file
32
proxy/src/mock.rs
Normal file
@@ -0,0 +1,32 @@
|
||||
use bytes::Bytes;
|
||||
|
||||
use crate::{auth::{PlaintextStoredSecret, SecretStore, StoredSecret}, compute::ComputeProvider, cplane_api::ClientCredentials, db::DatabaseConnInfo};
|
||||
|
||||
|
||||
pub struct MockConsole {
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl SecretStore for MockConsole {
|
||||
async fn get_stored_secret(&self, creds: &ClientCredentials) -> anyhow::Result<StoredSecret> {
|
||||
let salt = [0; 4];
|
||||
match (&creds.user[..], &creds.dbname[..]) {
|
||||
("postgres", "postgres") => Ok(StoredSecret::PlaintextPassword(PlaintextStoredSecret {
|
||||
salt,
|
||||
hashed_salted_password: "md52fff09cd9def51601fc5445943b3a11f\0".into(),
|
||||
compute_db_password: "postgres".into(),
|
||||
})),
|
||||
_ => unimplemented!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ComputeProvider for MockConsole{
|
||||
async fn get_compute_node(&self, creds: &ClientCredentials) -> anyhow::Result<DatabaseConnInfo> {
|
||||
return Ok(DatabaseConnInfo {
|
||||
host: "127.0.0.1".into(),
|
||||
port: 5432,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,9 @@
|
||||
use crate::auth::{self, StoredSecret, SecretStore};
|
||||
use crate::cancellation::{self, CancelClosure};
|
||||
use crate::compute::ComputeProvider;
|
||||
use crate::cplane_api as cplane;
|
||||
use crate::db::{AuthSecret, DatabaseAuthInfo};
|
||||
use crate::mock::MockConsole;
|
||||
use crate::state::SslConfig;
|
||||
use crate::stream::{PqStream, Stream};
|
||||
use crate::ProxyState;
|
||||
@@ -140,24 +144,28 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: implement proper authentication
|
||||
async fn connect_client_to_db(
|
||||
mut client: PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
creds: cplane::ClientCredentials,
|
||||
session: cancellation::Session,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: get this from an api call
|
||||
let db_info = cplane::DatabaseInfo {
|
||||
host: "127.0.0.1".into(),
|
||||
port: 5432,
|
||||
dbname: creds.dbname,
|
||||
user: "dmitry".into(),
|
||||
password: None,
|
||||
// Authenticate
|
||||
// TODO use real console
|
||||
let console = MockConsole {};
|
||||
let stored_secret = console.get_stored_secret(&creds).await?;
|
||||
let auth_secret = auth::authenticate(&mut client, stored_secret).await?;
|
||||
let conn_info = console.get_compute_node(&creds).await?;
|
||||
let db_auth_info = DatabaseAuthInfo {
|
||||
conn_info,
|
||||
creds,
|
||||
auth_secret,
|
||||
};
|
||||
|
||||
let (mut db, version, cancel_closure) = connect_to_db(db_info).await?;
|
||||
// Connect to db
|
||||
let (mut db, version, cancel_closure) = connect_to_db(db_auth_info).await?;
|
||||
let cancel_key_data = session.enable_cancellation(cancel_closure);
|
||||
|
||||
// Report success to client
|
||||
client
|
||||
.write_message_noflush(&Be::AuthenticationOk)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?
|
||||
@@ -191,10 +199,10 @@ fn hello_message(redirect_uri: &str, session_id: &str) -> String {
|
||||
|
||||
/// Connect to a corresponding compute node.
|
||||
async fn connect_to_db(
|
||||
db_info: cplane::DatabaseInfo,
|
||||
db_info: DatabaseAuthInfo,
|
||||
) -> anyhow::Result<(TcpStream, String, CancelClosure)> {
|
||||
// TODO: establish a secure connection to the DB
|
||||
let socket_addr = db_info.socket_addr()?;
|
||||
let socket_addr = db_info.conn_info.socket_addr()?;
|
||||
let mut socket = TcpStream::connect(socket_addr).await?;
|
||||
|
||||
let (client, conn) = tokio_postgres::Config::from(db_info)
|
||||
|
||||
@@ -49,6 +49,14 @@ impl<S: AsyncRead + Unpin> PqStream<S> {
|
||||
other => anyhow::bail!("bad message type: {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read_password_message(&mut self) -> anyhow::Result<bytes::Bytes> {
|
||||
match FeMessage::read_fut(&mut self.stream).await? {
|
||||
Some(FeMessage::PasswordMessage(msg)) => Ok(msg),
|
||||
None => anyhow::bail!("connection is lost"),
|
||||
other => anyhow::bail!("bad message type: {:?}", other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncWrite + Unpin> PqStream<S> {
|
||||
|
||||
Reference in New Issue
Block a user