Compare commits

...

2 Commits

Author SHA1 Message Date
Bojan Serafimov
64fcf4f096 Implement mock console 2022-02-09 14:30:01 -05:00
Dmitry Ivanov
18d3d078ad [WIP] [proxy] Migrate to async 2022-02-08 05:43:32 +03:00
14 changed files with 814 additions and 374 deletions

76
Cargo.lock generated
View File

@@ -23,6 +23,17 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e"
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.18"
@@ -75,9 +86,9 @@ dependencies = [
[[package]]
name = "async-trait"
version = "0.1.51"
version = "0.1.52"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3"
dependencies = [
"proc-macro2",
"quote",
@@ -772,7 +783,7 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
dependencies = [
"ahash",
"ahash 0.4.7",
]
[[package]]
@@ -780,6 +791,9 @@ name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash 0.7.6",
]
[[package]]
name = "hermit-abi"
@@ -980,7 +994,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afabcc15e437a6484fc4f12d0fd63068fe457bf93f1c148d3d9649c60b103f32"
dependencies = [
"base64 0.12.3",
"pem",
"pem 0.8.3",
"ring",
"serde",
"serde_json",
@@ -1339,6 +1353,15 @@ dependencies = [
"regex",
]
[[package]]
name = "pem"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947"
dependencies = [
"base64 0.13.0",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@@ -1541,21 +1564,30 @@ name = "proxy"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"base64 0.13.0",
"bytes",
"clap",
"futures",
"hashbrown 0.11.2",
"hex",
"hyper",
"lazy_static",
"md5",
"parking_lot",
"pin-project-lite",
"rand",
"rcgen",
"reqwest",
"routerify",
"rustls 0.19.1",
"scopeguard",
"serde",
"serde_json",
"tokio",
"tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
"tokio-postgres-rustls",
"tokio-rustls",
"zenith_metrics",
"zenith_utils",
]
@@ -1615,6 +1647,18 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rcgen"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5911d1403f4143c9d56a702069d593e8d0f3fab880a85e103604d0893ea31ba7"
dependencies = [
"chrono",
"pem 1.0.2",
"ring",
"yasna",
]
[[package]]
name = "redox_syscall"
version = "0.2.10"
@@ -2252,6 +2296,21 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "tokio-postgres-rustls"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd8c37d8c23cb6ecdc32fc171bade4e9c7f1be65f693a17afbaad02091a0a19"
dependencies = [
"futures",
"ring",
"rustls 0.19.1",
"tokio",
"tokio-postgres 0.7.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=2949d98df52587d562986aad155dd4e889e408b7)",
"tokio-rustls",
"webpki 0.21.4",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"
@@ -2724,6 +2783,15 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3"
[[package]]
name = "yasna"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e262a29d0e61ccf2b6190d7050d4b237535fc76ce4c1210d9caa316f71dffa75"
dependencies = [
"chrono",
]
[[package]]
name = "zenith"
version = "0.1.0"

View File

@@ -16,3 +16,8 @@ members = [
# This is useful for profiling and, to some extent, debug.
# Besides, debug info should not affect the performance.
debug = true
# This is only needed for proxy's tests
# TODO: we should probably fork tokio-postgres-rustls instead
[patch.crates-io]
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }

View File

@@ -16,13 +16,24 @@ hex = "0.4.3"
hyper = "0.14"
routerify = "2"
parking_lot = "0.11.2"
hashbrown = "0.11.2"
serde = "1"
serde_json = "1"
tokio = { version = "1.11", features = ["macros"] }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
tokio-rustls = "0.22.0"
clap = "2.33.0"
rustls = "0.19.1"
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] }
pin-project-lite = "0.2.7"
futures = "0.3.13"
scopeguard = "1.1.0"
zenith_utils = { path = "../zenith_utils" }
zenith_metrics = { path = "../zenith_metrics" }
base64 = "0.13.0"
async-trait = "0.1.52"
[dev-dependencies]
tokio-postgres-rustls = "0.8.0"
rcgen = "0.8.14"

41
proxy/src/auth.rs Normal file
View File

@@ -0,0 +1,41 @@
use crate::db::AuthSecret;
use crate::stream::PqStream;
use bytes::Bytes;
use tokio::io::{AsyncRead, AsyncWrite};
use zenith_utils::pq_proto::BeMessage as Be;
/// Stored secret for authenticating the user via md5 but authenticating
/// to the compute database with a (possibly different) plaintext password.
pub struct PlaintextStoredSecret {
pub salt: [u8; 4],
pub hashed_salted_password: Bytes,
pub compute_db_password: String,
}
/// Sufficient information to auth user and create AuthSecret
#[non_exhaustive]
pub enum StoredSecret {
PlaintextPassword(PlaintextStoredSecret),
// TODO add md5 option?
// TODO add SCRAM option
}
pub async fn authenticate(
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
stored_secret: StoredSecret
) -> anyhow::Result<AuthSecret> {
match stored_secret {
StoredSecret::PlaintextPassword(stored) => {
client.write_message(&Be::AuthenticationMD5Password(&stored.salt)).await?;
let provided = client.read_password_message().await?;
anyhow::ensure!(provided == stored.hashed_salted_password);
Ok(AuthSecret::Password(stored.compute_db_password))
},
}
}
#[async_trait::async_trait]
pub trait SecretStore {
async fn get_stored_secret(&self, creds: &crate::cplane_api::ClientCredentials) -> anyhow::Result<StoredSecret>;
}

90
proxy/src/cancellation.rs Normal file
View File

@@ -0,0 +1,90 @@
use anyhow::{anyhow, Context};
use hashbrown::HashMap;
use lazy_static::lazy_static;
use parking_lot::Mutex;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio_postgres::{CancelToken, NoTls};
use zenith_utils::pq_proto::CancelKeyData;
lazy_static! {
/// Enables serving CancelRequests.
static ref CANCEL_MAP: Mutex<HashMap<CancelKeyData, Option<CancelClosure>>> = Default::default();
}
/// This should've been a [`std::future::Future`], but
/// it's impossible to name a type of an unboxed future
/// (we'd need something like `#![feature(type_alias_impl_trait)]`).
#[derive(Clone)]
pub struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: CancelToken,
}
impl CancelClosure {
pub fn new(socket_addr: SocketAddr, cancel_token: CancelToken) -> Self {
Self {
socket_addr,
cancel_token,
}
}
/// Cancels the query running on user's compute node.
pub async fn try_cancel_query(self) -> anyhow::Result<()> {
let socket = TcpStream::connect(self.socket_addr).await?;
self.cancel_token.cancel_query_raw(socket, NoTls).await?;
Ok(())
}
}
/// Cancel a running query for the corresponding connection.
pub async fn cancel_session(key: CancelKeyData) -> anyhow::Result<()> {
let cancel_closure = CANCEL_MAP
.lock()
.get(&key)
.and_then(|x| x.clone())
.with_context(|| format!("unknown session: {:?}", key))?;
cancel_closure.try_cancel_query().await
}
/// Helper for registering query cancellation tokens.
pub struct Session(CancelKeyData);
impl Session {
/// Store the cancel token for the given session.
pub fn enable_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData {
CANCEL_MAP.lock().insert(self.0, Some(cancel_closure));
self.0
}
}
/// Run async action within an ephemeral session identified by [`CancelKeyData`].
pub async fn with_session<F, R, V>(f: F) -> anyhow::Result<V>
where
F: FnOnce(Session) -> R,
R: std::future::Future<Output = anyhow::Result<V>>,
{
// HACK: We'd rather get the real backend_pid but tokio_postgres doesn't
// expose it and we don't want to do another roundtrip to query
// for it. The client will be able to notice that this is not the
// actual backend_pid, but backend_pid is not used for anything
// so it doesn't matter.
let key = rand::random();
// The birthday problem is unlikely to happen here, but it's still possible
CANCEL_MAP
.lock()
.try_insert(key, None)
.map_err(|_| anyhow!("session already exists: {:?}", key))?;
// This will guarantee that the session gets dropped
// as soon as the future is finished.
scopeguard::defer! {
CANCEL_MAP.lock().remove(&key);
}
let session = Session(key);
f(session).await
}

7
proxy/src/compute.rs Normal file
View File

@@ -0,0 +1,7 @@
use crate::{cplane_api::ClientCredentials, db::DatabaseConnInfo};
#[async_trait::async_trait]
pub trait ComputeProvider {
async fn get_compute_node(&self, creds: &ClientCredentials) -> anyhow::Result<DatabaseConnInfo>;
}

View File

@@ -1,9 +1,33 @@
use anyhow::{anyhow, bail, Context};
use serde::{Deserialize, Serialize};
use std::net::{SocketAddr, ToSocketAddrs};
use std::collections::HashMap;
use crate::state::ProxyWaiters;
#[derive(Debug, PartialEq, Eq)]
pub struct ClientCredentials {
pub user: String,
pub dbname: String,
}
impl TryFrom<HashMap<String, String>> for ClientCredentials {
type Error = anyhow::Error;
fn try_from(mut value: HashMap<String, String>) -> Result<Self, Self::Error> {
let mut get_param = |key| {
value
.remove(key)
.with_context(|| format!("{} is missing in startup packet", key))
};
let user = get_param("user")?;
let db = get_param("database")?;
Ok(Self { user, dbname: db })
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct DatabaseInfo {
pub host: String,
@@ -21,35 +45,6 @@ enum ProxyAuthResponse {
NotReady { ready: bool }, // TODO: get rid of `ready`
}
impl DatabaseInfo {
pub fn socket_addr(&self) -> anyhow::Result<SocketAddr> {
let host_port = format!("{}:{}", self.host, self.port);
host_port
.to_socket_addrs()
.with_context(|| format!("cannot resolve {} to SocketAddr", host_port))?
.next()
.context("cannot resolve at least one SocketAddr")
}
}
impl From<DatabaseInfo> for tokio_postgres::Config {
fn from(db_info: DatabaseInfo) -> Self {
let mut config = tokio_postgres::Config::new();
config
.host(&db_info.host)
.port(db_info.port)
.dbname(&db_info.dbname)
.user(&db_info.user);
if let Some(password) = db_info.password {
config.password(password);
}
config
}
}
pub struct CPlaneApi<'a> {
auth_endpoint: &'a str,
waiters: &'a ProxyWaiters,

58
proxy/src/db.rs Normal file
View File

@@ -0,0 +1,58 @@
///
/// Utils for connecting with the postgres dataabase.
///
use std::net::{SocketAddr, ToSocketAddrs};
use anyhow::{Context, anyhow};
use crate::cplane_api::ClientCredentials;
pub struct DatabaseConnInfo {
pub host: String,
pub port: u16,
}
pub struct DatabaseAuthInfo {
pub conn_info: DatabaseConnInfo,
pub creds: ClientCredentials,
pub auth_secret: AuthSecret,
}
/// Sufficient information to auth with database
#[non_exhaustive]
#[derive(Debug)]
pub enum AuthSecret {
Password(String),
// TODO add SCRAM option
}
impl From<DatabaseAuthInfo> for tokio_postgres::Config {
fn from(auth_info: DatabaseAuthInfo) -> Self {
let mut config = tokio_postgres::Config::new();
config
.host(&auth_info.conn_info.host)
.port(auth_info.conn_info.port)
.dbname(&auth_info.creds.dbname)
.user(&auth_info.creds.user);
match auth_info.auth_secret {
AuthSecret::Password(password) => {
config.password(password);
}
}
config
}
}
impl DatabaseConnInfo {
pub fn socket_addr(&self) -> anyhow::Result<SocketAddr> {
let host_port = format!("{}:{}", self.host, self.port);
host_port
.to_socket_addrs()
.with_context(|| format!("cannot resolve {} to SocketAddr", host_port))?
.next()
.ok_or_else(|| anyhow!("cannot resolve at least one SocketAddr"))
}
}

View File

@@ -1,6 +1,7 @@
use anyhow::anyhow;
use hyper::{Body, Request, Response, StatusCode};
use routerify::RouterBuilder;
use std::net::TcpListener;
use zenith_utils::http::endpoint;
use zenith_utils::http::error::ApiError;
use zenith_utils::http::json::json_response;
@@ -9,7 +10,17 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(json_response(StatusCode::OK, "")?)
}
pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
let router = endpoint::make_router();
router.get("/v1/status", status_handler)
}
pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> {
let service = || routerify::RouterService::new(make_router().build()?);
hyper::Server::from_tcp(http_listener)?
.serve(service().map_err(|e| anyhow!(e))?)
.await?;
Ok(())
}

View File

@@ -8,18 +8,23 @@
use anyhow::bail;
use clap::{App, Arg};
use state::{ProxyConfig, ProxyState};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::{tcp_listener, GIT_VERSION};
mod compute;
mod mock;
mod auth;
mod db;
mod cancellation;
mod cplane_api;
mod http;
mod mgmt;
mod proxy;
mod state;
mod stream;
mod waiters;
fn main() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
zenith_metrics::set_common_metrics_prefix("zenith_proxy");
let arg_matches = App::new("Zenith proxy/router")
.version(GIT_VERSION)
@@ -107,35 +112,19 @@ fn main() -> anyhow::Result<()> {
let http_listener = tcp_listener::bind(state.conf.http_address)?;
println!("Starting proxy on {}", state.conf.proxy_address);
let pageserver_listener = tcp_listener::bind(state.conf.proxy_address)?;
let proxy_listener = tokio::net::TcpListener::bind(state.conf.proxy_address).await?;
println!("Starting mgmt on {}", state.conf.mgmt_address);
let mgmt_listener = tcp_listener::bind(state.conf.mgmt_address)?;
let threads = [
thread::Builder::new()
.name("Http thread".into())
.spawn(move || {
let router = http::make_router();
endpoint::serve_thread_main(
router,
http_listener,
std::future::pending(), // never shut down
)
})?,
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
thread::Builder::new()
.name("Listener thread".into())
.spawn(move || proxy::thread_main(state, pageserver_listener))?,
thread::Builder::new()
.name("Mgmt thread".into())
.spawn(move || mgmt::thread_main(state, mgmt_listener))?,
];
let http = tokio::spawn(http::thread_main(http_listener));
let proxy = tokio::spawn(proxy::thread_main(state, proxy_listener));
let mgmt = tokio::task::spawn_blocking(move || mgmt::thread_main(state, mgmt_listener));
for t in threads {
t.join().unwrap()?;
}
let _ = futures::future::try_join_all([http, proxy, mgmt])
.await?
.into_iter()
.collect::<Result<Vec<()>, _>>()?;
Ok(())
}

32
proxy/src/mock.rs Normal file
View File

@@ -0,0 +1,32 @@
use bytes::Bytes;
use crate::{auth::{PlaintextStoredSecret, SecretStore, StoredSecret}, compute::ComputeProvider, cplane_api::ClientCredentials, db::DatabaseConnInfo};
pub struct MockConsole {
}
#[async_trait::async_trait]
impl SecretStore for MockConsole {
async fn get_stored_secret(&self, creds: &ClientCredentials) -> anyhow::Result<StoredSecret> {
let salt = [0; 4];
match (&creds.user[..], &creds.dbname[..]) {
("postgres", "postgres") => Ok(StoredSecret::PlaintextPassword(PlaintextStoredSecret {
salt,
hashed_salted_password: "md52fff09cd9def51601fc5445943b3a11f\0".into(),
compute_db_password: "postgres".into(),
})),
_ => unimplemented!()
}
}
}
#[async_trait::async_trait]
impl ComputeProvider for MockConsole{
async fn get_compute_node(&self, creds: &ClientCredentials) -> anyhow::Result<DatabaseConnInfo> {
return Ok(DatabaseConnInfo {
host: "127.0.0.1".into(),
port: 5432,
})
}
}

View File

@@ -1,294 +1,185 @@
use crate::cplane_api::{CPlaneApi, DatabaseInfo};
use crate::auth::{self, StoredSecret, SecretStore};
use crate::cancellation::{self, CancelClosure};
use crate::compute::ComputeProvider;
use crate::cplane_api as cplane;
use crate::db::{AuthSecret, DatabaseAuthInfo};
use crate::mock::MockConsole;
use crate::state::SslConfig;
use crate::stream::{PqStream, Stream};
use crate::ProxyState;
use anyhow::{anyhow, bail, Context};
use anyhow::{bail, Context};
use lazy_static::lazy_static;
use parking_lot::Mutex;
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use std::cell::Cell;
use std::collections::HashMap;
use std::net::{SocketAddr, TcpStream};
use std::{io, thread};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
use zenith_metrics::{new_common_metric_name, register_int_counter, IntCounter};
use zenith_utils::postgres_backend::{self, PostgresBackend, ProtoState, Stream};
use zenith_utils::pq_proto::{BeMessage as Be, FeMessage as Fe, *};
use zenith_utils::sock_split::{ReadStream, WriteStream};
struct CancelClosure {
socket_addr: SocketAddr,
cancel_token: tokio_postgres::CancelToken,
}
impl CancelClosure {
async fn try_cancel_query(&self) {
if let Ok(socket) = tokio::net::TcpStream::connect(self.socket_addr).await {
// NOTE ignoring the result because:
// 1. This is a best effort attempt, the database doesn't have to listen
// 2. Being opaque about errors here helps avoid leaking info to unauthenticated user
let _ = self.cancel_token.cancel_query_raw(socket, NoTls).await;
}
}
}
use zenith_utils::pq_proto::{BeMessage as Be, *};
lazy_static! {
// Enables serving CancelRequests
static ref CANCEL_MAP: Mutex<HashMap<CancelKeyData, CancelClosure>> = Mutex::new(HashMap::new());
// Metrics
static ref NUM_CONNECTIONS_ACCEPTED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_accepted"),
"Number of TCP client connections accepted."
).unwrap();
)
.unwrap();
static ref NUM_CONNECTIONS_CLOSED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_closed"),
"Number of TCP client connections closed."
).unwrap();
static ref NUM_CONNECTIONS_FAILED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_connections_failed"),
"Number of TCP client connections that closed due to error."
).unwrap();
)
.unwrap();
static ref NUM_BYTES_PROXIED_COUNTER: IntCounter = register_int_counter!(
new_common_metric_name("num_bytes_proxied"),
"Number of bytes sent/received between any client and backend."
).unwrap();
)
.unwrap();
}
thread_local! {
// Used to clean up the CANCEL_MAP. Might not be necessary if we use tokio thread pool in main loop.
static THREAD_CANCEL_KEY_DATA: Cell<Option<CancelKeyData>> = Cell::new(None);
}
///
/// Main proxy listener loop.
///
/// Listens for connections, and launches a new handler thread for each.
///
pub fn thread_main(
pub async fn thread_main(
state: &'static ProxyState,
listener: std::net::TcpListener,
listener: tokio::net::TcpListener,
) -> anyhow::Result<()> {
loop {
let (socket, peer_addr) = listener.accept()?;
let (socket, peer_addr) = listener.accept().await?;
println!("accepted connection from {}", peer_addr);
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
socket.set_nodelay(true).unwrap();
// TODO Use a threadpool instead. Maybe use tokio's threadpool by
// spawning a future into its runtime. Tokio's JoinError should
// allow us to handle cleanup properly even if the future panics.
thread::Builder::new()
.name("Proxy thread".into())
.spawn(move || {
if let Err(err) = proxy_conn_main(state, socket) {
NUM_CONNECTIONS_FAILED_COUNTER.inc();
println!("error: {}", err);
}
tokio::spawn(log_error(async {
socket
.set_nodelay(true)
.context("failed to set socket option")?;
// Clean up CANCEL_MAP.
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
THREAD_CANCEL_KEY_DATA.with(|cell| {
if let Some(cancel_key_data) = cell.get() {
CANCEL_MAP.lock().remove(&cancel_key_data);
};
});
})?;
let tls = state.conf.ssl_config.clone();
handle_client(socket, tls).await
}));
}
}
// TODO: clean up fields
struct ProxyConnection {
state: &'static ProxyState,
psql_session_id: String,
pgb: PostgresBackend,
async fn log_error<R, F>(future: F) -> F::Output
where
F: std::future::Future<Output = anyhow::Result<R>>,
{
future.await.map_err(|err| {
println!("error: {}", err.to_string());
err
})
}
pub fn proxy_conn_main(state: &'static ProxyState, socket: TcpStream) -> anyhow::Result<()> {
let conn = ProxyConnection {
state,
psql_session_id: hex::encode(rand::random::<[u8; 8]>()),
pgb: PostgresBackend::new(
socket,
postgres_backend::AuthType::MD5,
state.conf.ssl_config.clone(),
false,
)?,
};
let (client, server) = match conn.handle_client()? {
Some(x) => x,
None => return Ok(()),
};
let server = zenith_utils::sock_split::BidiStream::from_tcp(server);
let client = match client {
Stream::Bidirectional(bidi_stream) => bidi_stream,
_ => panic!("invalid stream type"),
};
proxy(client.split(), server.split())
}
impl ProxyConnection {
/// Returns Ok(None) when connection was successfully closed.
fn handle_client(mut self) -> anyhow::Result<Option<(Stream, TcpStream)>> {
let mut authenticate = || {
let (username, dbname) = match self.handle_startup()? {
Some(x) => x,
None => return Ok(None),
};
// Both scenarios here should end up producing database credentials
if username.ends_with("@zenith") {
self.handle_existing_user(&username, &dbname).map(Some)
} else {
self.handle_new_user().map(Some)
}
};
let conn = match authenticate() {
Ok(Some(db_info)) => connect_to_db(db_info),
Ok(None) => return Ok(None),
Err(e) => {
// Report the error to the client
self.pgb.write_message(&Be::ErrorResponse(&e.to_string()))?;
bail!("failed to handle client: {:?}", e);
}
};
// We'll get rid of this once migration to async is complete
let (pg_version, db_stream) = {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let (pg_version, stream, cancel_key_data) = runtime.block_on(conn)?;
self.pgb
.write_message(&BeMessage::BackendKeyData(cancel_key_data))?;
let stream = stream.into_std()?;
stream.set_nonblocking(false)?;
(pg_version, stream)
};
// Let the client send new requests
self.pgb
.write_message_noflush(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion(&pg_version),
))?
.write_message(&Be::ReadyForQuery)?;
Ok(Some((self.pgb.into_stream(), db_stream)))
async fn handle_client(
stream: impl AsyncRead + AsyncWrite + Unpin,
tls: Option<SslConfig>,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
}
/// Returns Ok(None) when connection was successfully closed.
fn handle_startup(&mut self) -> anyhow::Result<Option<(String, String)>> {
let have_tls = self.pgb.tls_config.is_some();
let mut encrypted = false;
if let Some((stream, creds)) = handshake(stream, tls).await? {
cancellation::with_session(|session| async {
connect_client_to_db(stream, creds, session).await
})
.await?;
}
loop {
let msg = match self.pgb.read_message()? {
Some(Fe::StartupPacket(msg)) => msg,
None => bail!("connection is lost"),
bad => bail!("unexpected message type: {:?}", bad),
};
println!("got message: {:?}", msg);
Ok(())
}
match msg {
FeStartupPacket::GssEncRequest => {
self.pgb.write_message(&Be::EncryptionResponse(false))?;
}
FeStartupPacket::SslRequest => {
self.pgb.write_message(&Be::EncryptionResponse(have_tls))?;
if have_tls {
self.pgb.start_tls()?;
encrypted = true;
/// Handle a connection from one client.
/// For better testing experience, `stream` can be
/// any object satisfying the traits.
async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream: S,
mut tls: Option<SslConfig>,
) -> anyhow::Result<Option<(PqStream<Stream<S>>, cplane::ClientCredentials)>> {
// Client may try upgrading to each protocol only once
let (mut tried_ssl, mut tried_gss) = (false, false);
let mut stream = PqStream::new(Stream::from_raw(stream));
loop {
let msg = stream.read_startup_packet().await?;
println!("got message: {:?}", msg);
use FeStartupPacket::*;
match msg {
SslRequest => match stream.get_ref() {
Stream::Raw { .. } if !tried_ssl => {
tried_ssl = true;
// We can't perform TLS handshake without a config
let enc = tls.is_some();
stream.write_message(&Be::EncryptionResponse(enc)).await?;
if let Some(tls) = tls.take() {
// Upgrade raw stream into a secure TLS-backed stream.
// NOTE: We've consumed `tls`; this fact will be used later.
stream = PqStream::new(stream.into_inner().upgrade(tls).await?);
}
}
FeStartupPacket::StartupMessage { mut params, .. } => {
if have_tls && !encrypted {
bail!("must connect with TLS");
}
_ => bail!("protocol violation"),
},
GssEncRequest => match stream.get_ref() {
Stream::Raw { .. } if !tried_gss => {
tried_gss = true;
let mut get_param = |key| {
params
.remove(key)
.with_context(|| format!("{} is missing in startup packet", key))
};
// Currently, we don't support GSSAPI
stream.write_message(&Be::EncryptionResponse(false)).await?;
}
_ => bail!("protocol violation"),
},
StartupMessage { params, .. } => {
// Check that the config has been consumed during upgrade
// OR we didn't provide it at all (for dev purposes).
if tls.is_some() {
let msg = "connection is insecure (try using `sslmode=require`)";
stream.write_message(&Be::ErrorResponse(msg)).await?;
bail!(msg);
}
return Ok(Some((get_param("user")?, get_param("database")?)));
}
FeStartupPacket::CancelRequest(cancel_key_data) => {
if let Some(cancel_closure) = CANCEL_MAP.lock().get(&cancel_key_data) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(cancel_closure.try_cancel_query());
}
return Ok(None);
}
break Ok(Some((stream, params.try_into()?)));
}
CancelRequest(cancel_key_data) => {
cancellation::cancel_session(cancel_key_data).await?;
break Ok(None);
}
}
}
}
fn handle_existing_user(&mut self, user: &str, db: &str) -> anyhow::Result<DatabaseInfo> {
let md5_salt = rand::random::<[u8; 4]>();
async fn connect_client_to_db(
mut client: PqStream<impl AsyncRead + AsyncWrite + Unpin>,
creds: cplane::ClientCredentials,
session: cancellation::Session,
) -> anyhow::Result<()> {
// Authenticate
// TODO use real console
let console = MockConsole {};
let stored_secret = console.get_stored_secret(&creds).await?;
let auth_secret = auth::authenticate(&mut client, stored_secret).await?;
let conn_info = console.get_compute_node(&creds).await?;
let db_auth_info = DatabaseAuthInfo {
conn_info,
creds,
auth_secret,
};
// Ask password
self.pgb
.write_message(&Be::AuthenticationMD5Password(&md5_salt))?;
self.pgb.state = ProtoState::Authentication; // XXX
// Connect to db
let (mut db, version, cancel_closure) = connect_to_db(db_auth_info).await?;
let cancel_key_data = session.enable_cancellation(cancel_closure);
// Check password
let msg = match self.pgb.read_message()? {
Some(Fe::PasswordMessage(msg)) => msg,
None => bail!("connection is lost"),
bad => bail!("unexpected message type: {:?}", bad),
};
println!("got message: {:?}", msg);
// Report success to client
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message_noflush(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion(&version),
))?
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
.write_message(&BeMessage::ReadyForQuery)
.await?;
let (_trailing_null, md5_response) = msg
.split_last()
.ok_or_else(|| anyhow!("unexpected password message"))?;
let mut client = client.into_inner();
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;
let cplane = CPlaneApi::new(&self.state.conf.auth_endpoint, &self.state.waiters);
let db_info = cplane.authenticate_proxy_request(
user,
db,
md5_response,
&md5_salt,
&self.psql_session_id,
)?;
self.pgb
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
Ok(db_info)
}
fn handle_new_user(&mut self) -> anyhow::Result<DatabaseInfo> {
let greeting = hello_message(&self.state.conf.redirect_uri, &self.psql_session_id);
// First, register this session
let waiter = self.state.waiters.register(self.psql_session_id.clone());
// Give user a URL to spawn a new database
self.pgb
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message(&Be::NoticeResponse(greeting))?;
// Wait for web console response
let db_info = waiter.wait()?.map_err(|e| anyhow!(e))?;
self.pgb
.write_message_noflush(&Be::NoticeResponse("Connecting to database.".into()))?;
Ok(db_info)
}
Ok(())
}
fn hello_message(redirect_uri: &str, session_id: &str) -> String {
@@ -306,81 +197,147 @@ fn hello_message(redirect_uri: &str, session_id: &str) -> String {
)
}
/// Create a TCP connection to a postgres database, authenticate with it, and receive the ReadyForQuery message
/// Connect to a corresponding compute node.
async fn connect_to_db(
db_info: DatabaseInfo,
) -> anyhow::Result<(String, tokio::net::TcpStream, CancelKeyData)> {
// Make raw connection. When connect_raw finishes we've received ReadyForQuery.
let socket_addr = db_info.socket_addr()?;
let mut socket = tokio::net::TcpStream::connect(socket_addr).await?;
let config = tokio_postgres::Config::from(db_info);
// NOTE We effectively ignore some ParameterStatus and NoticeResponse
// messages here. Not sure if that could break something.
let (client, conn) = config.connect_raw(&mut socket, NoTls).await?;
db_info: DatabaseAuthInfo,
) -> anyhow::Result<(TcpStream, String, CancelClosure)> {
// TODO: establish a secure connection to the DB
let socket_addr = db_info.conn_info.socket_addr()?;
let mut socket = TcpStream::connect(socket_addr).await?;
// Save info for potentially cancelling the query later
let mut rng = StdRng::from_entropy();
let cancel_key_data = CancelKeyData {
// HACK We'd rather get the real backend_pid but tokio_postgres doesn't
// expose it and we don't want to do another roundtrip to query
// for it. The client will be able to notice that this is not the
// actual backend_pid, but backend_pid is not used for anything
// so it doesn't matter.
backend_pid: rng.gen(),
cancel_key: rng.gen(),
};
let cancel_closure = CancelClosure {
socket_addr,
cancel_token: client.cancel_token(),
};
CANCEL_MAP.lock().insert(cancel_key_data, cancel_closure);
THREAD_CANCEL_KEY_DATA.with(|cell| {
let prev_value = cell.replace(Some(cancel_key_data));
assert!(
prev_value.is_none(),
"THREAD_CANCEL_KEY_DATA was already set"
);
});
let (client, conn) = tokio_postgres::Config::from(db_info)
.connect_raw(&mut socket, NoTls)
.await?;
let version = conn.parameter("server_version").unwrap();
Ok((version.into(), socket, cancel_key_data))
let version = conn
.parameter("server_version")
.context("failed to fetch postgres server version")?
.into();
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
Ok((socket, version, cancel_closure))
}
/// Concurrently proxy both directions of the client and server connections
fn proxy(
(client_read, client_write): (ReadStream, WriteStream),
(server_read, server_write): (ReadStream, WriteStream),
) -> anyhow::Result<()> {
fn do_proxy(mut reader: impl io::Read, mut writer: WriteStream) -> io::Result<u64> {
/// FlushWriter will make sure that every message is sent as soon as possible
struct FlushWriter<W>(W);
#[cfg(test)]
mod tests {
use super::*;
impl<W: io::Write> io::Write for FlushWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// `std::io::copy` is guaranteed to exit if we return an error,
// so we can afford to lose `res` in case `flush` fails
let res = self.0.write(buf);
if let Ok(count) = res {
NUM_BYTES_PROXIED_COUNTER.inc_by(count as u64);
self.flush()?;
}
res
}
use tokio::io::DuplexStream;
use tokio_postgres::config::SslMode;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres_rustls::MakeRustlsConnect;
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
async fn dummy_proxy(
client: impl AsyncRead + AsyncWrite + Unpin,
tls: Option<SslConfig>,
) -> anyhow::Result<()> {
// TODO: add some infra + tests for credentials
let (mut stream, _creds) = handshake(client, tls).await?.context("no stream")?;
let res = std::io::copy(&mut reader, &mut FlushWriter(&mut writer));
writer.shutdown(std::net::Shutdown::Both)?;
res
stream
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message(&BeMessage::ReadyForQuery)
.await?;
Ok(())
}
let client_to_server_jh = thread::spawn(move || do_proxy(client_read, server_write));
fn generate_certs(
hostname: &str,
) -> anyhow::Result<(rustls::Certificate, rustls::Certificate, rustls::PrivateKey)> {
let ca = rcgen::Certificate::from_params({
let mut params = rcgen::CertificateParams::default();
params.is_ca = rcgen::IsCa::Ca(rcgen::BasicConstraints::Unconstrained);
params
})?;
do_proxy(server_read, client_write)?;
client_to_server_jh.join().unwrap()?;
let cert = rcgen::generate_simple_self_signed(vec![hostname.into()])?;
Ok((
rustls::Certificate(ca.serialize_der()?),
rustls::Certificate(cert.serialize_der_with_signer(&ca)?),
rustls::PrivateKey(cert.serialize_private_key_der()),
))
}
Ok(())
#[tokio::test]
async fn handshake_tls_is_enforced_by_proxy() -> anyhow::Result<()> {
let (client, server) = tokio::io::duplex(1024);
let server_config = {
let (_ca, cert, key) = generate_certs("localhost")?;
let mut config = rustls::ServerConfig::new(rustls::NoClientAuth::new());
config.set_single_cert(vec![cert], key)?;
config
};
let proxy = tokio::spawn(dummy_proxy(client, Some(server_config.into())));
tokio_postgres::Config::new()
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Disable)
.connect_raw(server, NoTls)
.await
.err() // -> Option<E>
.context("client shouldn't be able to connect")?;
proxy
.await?
.err() // -> Option<E>
.context("server shouldn't accept client")?;
Ok(())
}
#[tokio::test]
async fn handshake_tls() -> anyhow::Result<()> {
let (client, server) = tokio::io::duplex(1024);
let (ca, cert, key) = generate_certs("localhost")?;
let server_config = {
let mut config = rustls::ServerConfig::new(rustls::NoClientAuth::new());
config.set_single_cert(vec![cert], key)?;
config
};
let proxy = tokio::spawn(dummy_proxy(client, Some(server_config.into())));
let client_config = {
let mut config = rustls::ClientConfig::new();
config.root_store.add(&ca)?;
config
};
let mut mk = MakeRustlsConnect::new(client_config);
let tls = MakeTlsConnect::<DuplexStream>::make_tls_connect(&mut mk, "localhost")?;
let (_client, _conn) = tokio_postgres::Config::new()
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Require)
.connect_raw(server, tls)
.await?;
proxy.await?
}
#[tokio::test]
async fn handshake_raw() -> anyhow::Result<()> {
let (client, server) = tokio::io::duplex(1024);
let proxy = tokio::spawn(dummy_proxy(client, None));
let (_client, _conn) = tokio_postgres::Config::new()
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Prefer)
.connect_raw(server, NoTls)
.await?;
proxy.await?
}
}

166
proxy/src/stream.rs Normal file
View File

@@ -0,0 +1,166 @@
use bytes::BytesMut;
use pin_project_lite::pin_project;
use rustls::ServerConfig;
use std::pin::Pin;
use std::sync::Arc;
use std::{io, task};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_rustls::server::TlsStream;
use zenith_utils::pq_proto::{BeMessage, FeMessage, FeStartupPacket};
pin_project! {
/// Stream wrapper which implements libpq's protocol.
/// NOTE: This object deliberately doesn't implement [`AsyncRead`]
/// or [`AsyncWrite`] to prevent subtle errors (e.g. trying
/// to pass random malformed bytes through the connection).
pub struct PqStream<S> {
#[pin]
stream: S,
buffer: BytesMut,
}
}
impl<S> PqStream<S> {
/// Construct a new libpq protocol wrapper.
pub fn new(stream: S) -> Self {
Self {
stream,
buffer: Default::default(),
}
}
/// Extract the underlying stream.
pub fn into_inner(self) -> S {
self.stream
}
/// Get a reference to the underlying stream.
pub fn get_ref(&self) -> &S {
&self.stream
}
}
impl<S: AsyncRead + Unpin> PqStream<S> {
/// Receive [`FeStartupPacket`], which is a first packet sent by a client.
pub async fn read_startup_packet(&mut self) -> anyhow::Result<FeStartupPacket> {
match FeStartupPacket::read_fut(&mut self.stream).await? {
Some(FeMessage::StartupPacket(packet)) => Ok(packet),
None => anyhow::bail!("connection is lost"),
other => anyhow::bail!("bad message type: {:?}", other),
}
}
pub async fn read_password_message(&mut self) -> anyhow::Result<bytes::Bytes> {
match FeMessage::read_fut(&mut self.stream).await? {
Some(FeMessage::PasswordMessage(msg)) => Ok(msg),
None => anyhow::bail!("connection is lost"),
other => anyhow::bail!("bad message type: {:?}", other),
}
}
}
impl<S: AsyncWrite + Unpin> PqStream<S> {
/// Write the message into an internal buffer, but don't flush the underlying stream.
pub fn write_message_noflush<'a>(&mut self, message: &BeMessage<'a>) -> io::Result<&mut Self> {
BeMessage::write(&mut self.buffer, message)?;
Ok(self)
}
/// Write the message into an internal buffer and flush it.
pub async fn write_message<'a>(&mut self, message: &BeMessage<'a>) -> io::Result<&mut Self> {
self.write_message_noflush(message)?;
self.flush().await?;
Ok(self)
}
/// Flush the output buffer into the underlying stream.
pub async fn flush(&mut self) -> io::Result<&mut Self> {
self.stream.write_all(&self.buffer).await?;
self.buffer.clear();
self.stream.flush().await?;
Ok(self)
}
}
pin_project! {
/// Wrapper for upgrading raw streams into secure streams.
/// NOTE: it should be possible to decompose this object as necessary.
#[project = StreamProj]
pub enum Stream<S> {
/// We always begin with a raw stream,
/// which may then be upgraded into a secure stream.
Raw { #[pin] raw: S },
/// We box [`TlsStream`] since it can be quite large.
Tls { #[pin] tls: Box<TlsStream<S>> },
}
}
impl<S> Stream<S> {
/// Construct a new instance from a raw stream.
pub fn from_raw(raw: S) -> Self {
Self::Raw { raw }
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
/// If possible, upgrade raw stream into a secure TLS-based stream.
pub async fn upgrade(self, cfg: Arc<ServerConfig>) -> anyhow::Result<Self> {
match self {
Stream::Raw { raw } => {
let tls = Box::new(tokio_rustls::TlsAcceptor::from(cfg).accept(raw).await?);
Ok(Stream::Tls { tls })
}
Stream::Tls { .. } => anyhow::bail!("can't upgrade TLS stream"),
}
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for Stream<S> {
fn poll_read(
self: Pin<&mut Self>,
context: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> task::Poll<io::Result<()>> {
use StreamProj::*;
match self.project() {
Raw { raw } => raw.poll_read(context, buf),
Tls { tls } => tls.poll_read(context, buf),
}
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for Stream<S> {
fn poll_write(
self: Pin<&mut Self>,
context: &mut task::Context<'_>,
buf: &[u8],
) -> task::Poll<io::Result<usize>> {
use StreamProj::*;
match self.project() {
Raw { raw } => raw.poll_write(context, buf),
Tls { tls } => tls.poll_write(context, buf),
}
}
fn poll_flush(
self: Pin<&mut Self>,
context: &mut task::Context<'_>,
) -> task::Poll<io::Result<()>> {
use StreamProj::*;
match self.project() {
Raw { raw } => raw.poll_flush(context),
Tls { tls } => tls.poll_flush(context),
}
}
fn poll_shutdown(
self: Pin<&mut Self>,
context: &mut task::Context<'_>,
) -> task::Poll<io::Result<()>> {
use StreamProj::*;
match self.project() {
Raw { raw } => raw.poll_shutdown(context),
Tls { tls } => tls.poll_shutdown(context),
}
}
}

View File

@@ -57,6 +57,16 @@ pub struct CancelKeyData {
pub cancel_key: i32,
}
use rand::distributions::{Distribution, Standard};
impl Distribution<CancelKeyData> for Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {
CancelKeyData {
backend_pid: rng.gen(),
cancel_key: rng.gen(),
}
}
}
#[derive(Debug)]
pub struct FeQueryMessage {
pub body: Bytes,