mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
Compare commits
5 Commits
release-pr
...
heikki/psq
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e79873c9e | ||
|
|
52c2083b52 | ||
|
|
ae25d4ab35 | ||
|
|
620efed7f6 | ||
|
|
45ca653d9c |
255
proxy/src/bin/pg_sni_router.rs
Normal file
255
proxy/src/bin/pg_sni_router.rs
Normal file
@@ -0,0 +1,255 @@
|
||||
/// A stand-alone program that routes connections, e.g. from
|
||||
/// `aaa--bbb--123.external.domain` to `aaa.bbb.123.internal.domain`.
|
||||
///
|
||||
/// This allows connecting to pods/services running in the same Kubernetes cluster from
|
||||
/// the outside. Similar to an ingress controller for HTTPS.
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use clap::{self, Arg};
|
||||
use futures::TryFutureExt;
|
||||
use proxy::console::messages::MetricsAuxInfo;
|
||||
use proxy::stream::{PqStream, Stream};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{project_git_version, sentry_init::init_sentry};
|
||||
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
fn cli() -> clap::Command {
|
||||
clap::Command::new("Neon proxy/router")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
Arg::new("listen")
|
||||
.short('l')
|
||||
.long("listen")
|
||||
.help("listen for incoming client connections on ip:port")
|
||||
.default_value("127.0.0.1:4432"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("tls-key")
|
||||
.short('k')
|
||||
.long("tls-key")
|
||||
.help("path to TLS key for client postgres connections")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("tls-cert")
|
||||
.short('c')
|
||||
.long("tls-cert")
|
||||
.help("path to TLS cert for client postgres connections")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("dest")
|
||||
.short('d')
|
||||
.long("destination")
|
||||
.help("append this domain zone to the SNI hostname to get the destination address")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("dest-port")
|
||||
.long("destination-port")
|
||||
.help("destination port to connect to")
|
||||
.default_value("5432")
|
||||
.value_parser(clap::value_parser!(u16)),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = proxy::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
let args = cli().get_matches();
|
||||
let destination: String = args.get_one::<String>("dest").unwrap().parse()?;
|
||||
let destination_port: u16 = *args.get_one::<u16>("dest-port").unwrap();
|
||||
|
||||
// Configure TLS
|
||||
let tls_config: Arc<rustls::ServerConfig> = match (
|
||||
args.get_one::<String>("tls-key"),
|
||||
args.get_one::<String>("tls-cert"),
|
||||
) {
|
||||
(Some(key_path), Some(cert_path)) => {
|
||||
let key = {
|
||||
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
|
||||
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
|
||||
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
|
||||
keys.pop().map(rustls::PrivateKey).unwrap()
|
||||
};
|
||||
|
||||
let cert_chain_bytes = std::fs::read(cert_path)
|
||||
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
|
||||
|
||||
let cert_chain = {
|
||||
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
|
||||
.context(format!(
|
||||
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
|
||||
))?
|
||||
.into_iter()
|
||||
.map(rustls::Certificate)
|
||||
.collect()
|
||||
};
|
||||
|
||||
rustls::ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, key)?
|
||||
.into()
|
||||
}
|
||||
_ => bail!("tls-key and tls-cert must be specified"),
|
||||
};
|
||||
|
||||
// Start listening for incoming client connections
|
||||
let proxy_address: SocketAddr = args.get_one::<String>("listen").unwrap().parse()?;
|
||||
info!("Starting proxy on {proxy_address}");
|
||||
let proxy_listener = TcpListener::bind(proxy_address).await?;
|
||||
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let tasks = vec![
|
||||
tokio::spawn(proxy::handle_signals(cancellation_token.clone())),
|
||||
tokio::spawn(task_main(
|
||||
Arc::new(destination),
|
||||
destination_port,
|
||||
tls_config,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
)),
|
||||
];
|
||||
|
||||
let _tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn task_main(
|
||||
dest_suffix: Arc<String>,
|
||||
dest_port: u16,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("proxy has shut down");
|
||||
}
|
||||
|
||||
// When set for the server socket, the keepalive setting
|
||||
// will be inherited by all accepted client sockets.
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let mut connections = tokio::task::JoinSet::new();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
accept_result = listener.accept() => {
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
info!("accepted postgres client connection from {peer_addr}");
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let tls_config = Arc::clone(&tls_config);
|
||||
let dest_suffix = Arc::clone(&dest_suffix);
|
||||
|
||||
connections.spawn(
|
||||
async move {
|
||||
info!("spawned a task for {peer_addr}");
|
||||
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
.context("failed to set socket option")?;
|
||||
|
||||
handle_client(dest_suffix, dest_port, tls_config, session_id, socket).await
|
||||
}
|
||||
.unwrap_or_else(|e| {
|
||||
// Acknowledge that the task has finished with an error.
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
}),
|
||||
);
|
||||
}
|
||||
_ = cancellation_token.cancelled() => {
|
||||
drop(listener);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Drain connections
|
||||
while let Some(res) = connections.join_next().await {
|
||||
if let Err(e) = res {
|
||||
if !e.is_panic() && !e.is_cancelled() {
|
||||
warn!("unexpected error from joined connection task: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
|
||||
async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
raw_stream: S,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
) -> anyhow::Result<Stream<S>> {
|
||||
let mut stream = PqStream::new(Stream::from_raw(raw_stream));
|
||||
|
||||
let msg = stream.read_startup_packet().await?;
|
||||
info!("received {msg:?}");
|
||||
use pq_proto::FeStartupPacket::*;
|
||||
|
||||
match msg {
|
||||
SslRequest => {
|
||||
stream
|
||||
.write_message(&pq_proto::BeMessage::EncryptionResponse(true))
|
||||
.await?;
|
||||
// Upgrade raw stream into a secure TLS-backed stream.
|
||||
// NOTE: We've consumed `tls`; this fact will be used later.
|
||||
|
||||
let (raw, read_buf) = stream.into_inner();
|
||||
// TODO: Normally, client doesn't send any data before
|
||||
// server says TLS handshake is ok and read_buf is empy.
|
||||
// However, you could imagine pipelining of postgres
|
||||
// SSLRequest + TLS ClientHello in one hunk similar to
|
||||
// pipelining in our node js driver. We should probably
|
||||
// support that by chaining read_buf with the stream.
|
||||
if !read_buf.is_empty() {
|
||||
bail!("data is sent before server replied with EncryptionResponse");
|
||||
}
|
||||
Ok(raw.upgrade(tls_config).await?)
|
||||
}
|
||||
_ => stream.throw_error_str(ERR_INSECURE_CONNECTION).await?,
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(fields(session_id = ?session_id), skip_all)]
|
||||
async fn handle_client(
|
||||
dest_suffix: Arc<String>,
|
||||
dest_port: u16,
|
||||
tls_config: Arc<rustls::ServerConfig>,
|
||||
session_id: uuid::Uuid,
|
||||
stream: impl AsyncRead + AsyncWrite + Unpin,
|
||||
) -> anyhow::Result<()> {
|
||||
let tls_stream = ssl_handshake(stream, tls_config).await?;
|
||||
|
||||
// cut off first part of the sni domain
|
||||
let sni = tls_stream.sni_hostname().ok_or(anyhow!("SNI missing"))?;
|
||||
let dest = sni
|
||||
.split_once('.')
|
||||
.context("invalid sni")?
|
||||
.0
|
||||
.replace("--", ".");
|
||||
|
||||
let destination = format!("{}.{}:{}", dest, dest_suffix, dest_port);
|
||||
|
||||
info!("destination: {}:{}", destination, dest_port);
|
||||
|
||||
let client = tokio::net::TcpStream::connect(destination).await?;
|
||||
|
||||
let metrics_aux: MetricsAuxInfo = Default::default();
|
||||
proxy::proxy::proxy_pass(tls_stream, client, &metrics_aux).await
|
||||
}
|
||||
@@ -1,49 +1,22 @@
|
||||
//! Postgres protocol proxy/router.
|
||||
//!
|
||||
//! This service listens psql port and can check auth via external service
|
||||
//! (control plane API in our case) and can create new databases and accounts
|
||||
//! in somewhat transparent manner (again via communication with control plane API).
|
||||
use proxy::auth;
|
||||
use proxy::console;
|
||||
use proxy::http;
|
||||
use proxy::metrics;
|
||||
|
||||
mod auth;
|
||||
mod cache;
|
||||
mod cancellation;
|
||||
mod compute;
|
||||
mod config;
|
||||
mod console;
|
||||
mod error;
|
||||
mod http;
|
||||
mod logging;
|
||||
mod metrics;
|
||||
mod parse;
|
||||
mod proxy;
|
||||
mod sasl;
|
||||
mod scram;
|
||||
mod stream;
|
||||
mod url;
|
||||
mod waiters;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use anyhow::bail;
|
||||
use clap::{self, Arg};
|
||||
use config::ProxyConfig;
|
||||
use futures::FutureExt;
|
||||
use std::{borrow::Cow, future::Future, net::SocketAddr};
|
||||
use tokio::{net::TcpListener, task::JoinError};
|
||||
use proxy::config::{self, ProxyConfig};
|
||||
use std::{borrow::Cow, net::SocketAddr};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
use tracing::info;
|
||||
use utils::{project_git_version, sentry_init::init_sentry};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
/// Flattens `Result<Result<T>>` into `Result<T>`.
|
||||
async fn flatten_err(
|
||||
f: impl Future<Output = Result<anyhow::Result<()>, JoinError>>,
|
||||
) -> anyhow::Result<()> {
|
||||
f.map(|r| r.context("join error").and_then(|x| x)).await
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = logging::init().await?;
|
||||
let _logging_guard = proxy::logging::init().await?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
@@ -69,7 +42,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
let proxy_listener = TcpListener::bind(proxy_address).await?;
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let mut client_tasks = vec![tokio::spawn(proxy::task_main(
|
||||
let mut client_tasks = vec![tokio::spawn(proxy::proxy::task_main(
|
||||
config,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
@@ -88,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
let mut tasks = vec![
|
||||
tokio::spawn(handle_signals(cancellation_token)),
|
||||
tokio::spawn(proxy::handle_signals(cancellation_token)),
|
||||
tokio::spawn(http::server::task_main(http_listener)),
|
||||
tokio::spawn(console::mgmt::task_main(mgmt_listener)),
|
||||
];
|
||||
@@ -97,8 +70,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
tasks.push(tokio::spawn(metrics::task_main(metrics_config)));
|
||||
}
|
||||
|
||||
let tasks = futures::future::try_join_all(tasks.into_iter().map(flatten_err));
|
||||
let client_tasks = futures::future::try_join_all(client_tasks.into_iter().map(flatten_err));
|
||||
let tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err));
|
||||
let client_tasks =
|
||||
futures::future::try_join_all(client_tasks.into_iter().map(proxy::flatten_err));
|
||||
tokio::select! {
|
||||
// We are only expecting an error from these forever tasks
|
||||
res = tasks => { res?; },
|
||||
@@ -107,33 +81,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle unix signals appropriately.
|
||||
async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
let mut hangup = signal(SignalKind::hangup())?;
|
||||
let mut interrupt = signal(SignalKind::interrupt())?;
|
||||
let mut terminate = signal(SignalKind::terminate())?;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Hangup is commonly used for config reload.
|
||||
_ = hangup.recv() => {
|
||||
warn!("received SIGHUP; config reload is not supported");
|
||||
}
|
||||
// Shut down the whole application.
|
||||
_ = interrupt.recv() => {
|
||||
warn!("received SIGINT, exiting immediately");
|
||||
bail!("interrupted");
|
||||
}
|
||||
_ = terminate.recv() => {
|
||||
warn!("received SIGTERM, shutting down once all existing connections have closed");
|
||||
token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// ProxyConfig is created at proxy startup, and lives forever.
|
||||
fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> {
|
||||
let tls_config = match (
|
||||
57
proxy/src/lib.rs
Normal file
57
proxy/src/lib.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use anyhow::{bail, Context};
|
||||
use futures::{Future, FutureExt};
|
||||
use tokio::task::JoinError;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
pub mod auth;
|
||||
pub mod cache;
|
||||
pub mod cancellation;
|
||||
pub mod compute;
|
||||
pub mod config;
|
||||
pub mod console;
|
||||
pub mod error;
|
||||
pub mod http;
|
||||
pub mod logging;
|
||||
pub mod metrics;
|
||||
pub mod parse;
|
||||
pub mod proxy;
|
||||
pub mod sasl;
|
||||
pub mod scram;
|
||||
pub mod stream;
|
||||
pub mod url;
|
||||
pub mod waiters;
|
||||
|
||||
/// Handle unix signals appropriately.
|
||||
pub async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> {
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
|
||||
let mut hangup = signal(SignalKind::hangup())?;
|
||||
let mut interrupt = signal(SignalKind::interrupt())?;
|
||||
let mut terminate = signal(SignalKind::terminate())?;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Hangup is commonly used for config reload.
|
||||
_ = hangup.recv() => {
|
||||
warn!("received SIGHUP; config reload is not supported");
|
||||
}
|
||||
// Shut down the whole application.
|
||||
_ = interrupt.recv() => {
|
||||
warn!("received SIGINT, exiting immediately");
|
||||
bail!("interrupted");
|
||||
}
|
||||
_ = terminate.recv() => {
|
||||
warn!("received SIGTERM, shutting down once all existing connections have closed");
|
||||
token.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Flattens `Result<Result<T>>` into `Result<T>`.
|
||||
pub async fn flatten_err(
|
||||
f: impl Future<Output = Result<anyhow::Result<()>, JoinError>>,
|
||||
) -> anyhow::Result<()> {
|
||||
f.map(|r| r.context("join error").and_then(|x| x)).await
|
||||
}
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
auth::{self, backend::AuthSuccess},
|
||||
cancellation::{self, CancelMap},
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
config::ProxyConfig,
|
||||
console::{self, messages::MetricsAuxInfo},
|
||||
error::io_error,
|
||||
stream::{PqStream, Stream},
|
||||
@@ -174,7 +174,7 @@ async fn handle_client(
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.inc();
|
||||
}
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
let tls = config.tls_config.as_ref().map(|t| t.to_server_config());
|
||||
let do_handshake = handshake(stream, tls, cancel_map);
|
||||
let (mut stream, params) = match do_handshake.await? {
|
||||
Some(x) => x,
|
||||
@@ -184,7 +184,10 @@ async fn handle_client(
|
||||
// Extract credentials which we're going to use for auth.
|
||||
let creds = {
|
||||
let sni = stream.get_ref().sni_hostname();
|
||||
let common_names = tls.and_then(|tls| tls.common_names.clone());
|
||||
let common_names = config
|
||||
.tls_config
|
||||
.as_ref()
|
||||
.and_then(|tls| tls.common_names.clone());
|
||||
let result = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
@@ -205,13 +208,14 @@ async fn handle_client(
|
||||
/// It's easier to work with owned `stream` here as we need to upgrade it to TLS;
|
||||
/// we also take an extra care of propagating only the select handshake errors to client.
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
stream: S,
|
||||
mut tls: Option<&TlsConfig>,
|
||||
tls: Option<Arc<rustls::ServerConfig>>,
|
||||
cancel_map: &CancelMap,
|
||||
) -> anyhow::Result<Option<(PqStream<Stream<S>>, StartupMessageParams)>> {
|
||||
// Client may try upgrading to each protocol only once
|
||||
let (mut tried_ssl, mut tried_gss) = (false, false);
|
||||
let mut tls_upgraded = false;
|
||||
|
||||
let mut stream = PqStream::new(Stream::from_raw(stream));
|
||||
loop {
|
||||
@@ -226,8 +230,9 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
// We can't perform TLS handshake without a config
|
||||
let enc = tls.is_some();
|
||||
|
||||
stream.write_message(&Be::EncryptionResponse(enc)).await?;
|
||||
if let Some(tls) = tls.take() {
|
||||
if let Some(tls) = tls.clone() {
|
||||
// Upgrade raw stream into a secure TLS-backed stream.
|
||||
// NOTE: We've consumed `tls`; this fact will be used later.
|
||||
|
||||
@@ -241,7 +246,8 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
if !read_buf.is_empty() {
|
||||
bail!("data is sent before server replied with EncryptionResponse");
|
||||
}
|
||||
stream = PqStream::new(raw.upgrade(tls.to_server_config()).await?);
|
||||
stream = PqStream::new(raw.upgrade(tls).await?);
|
||||
tls_upgraded = true;
|
||||
}
|
||||
}
|
||||
_ => bail!(ERR_PROTO_VIOLATION),
|
||||
@@ -256,9 +262,8 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
_ => bail!(ERR_PROTO_VIOLATION),
|
||||
},
|
||||
StartupMessage { params, .. } => {
|
||||
// Check that the config has been consumed during upgrade
|
||||
// OR we didn't provide it at all (for dev purposes).
|
||||
if tls.is_some() {
|
||||
// Check that tls was actually upgraded
|
||||
if !tls_upgraded {
|
||||
stream.throw_error_str(ERR_INSECURE_CONNECTION).await?;
|
||||
}
|
||||
|
||||
@@ -340,7 +345,7 @@ async fn connect_to_compute(
|
||||
|
||||
/// Finish client connection initialization: confirm auth success, send params, etc.
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn prepare_client_connection(
|
||||
pub async fn prepare_client_connection(
|
||||
node: &compute::PostgresConnection,
|
||||
reported_auth_ok: bool,
|
||||
session: cancellation::Session<'_>,
|
||||
@@ -378,7 +383,7 @@ async fn prepare_client_connection(
|
||||
|
||||
/// Forward bytes in both directions (client <-> compute).
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn proxy_pass(
|
||||
pub async fn proxy_pass(
|
||||
client: impl AsyncRead + AsyncWrite + Unpin,
|
||||
compute: impl AsyncRead + AsyncWrite + Unpin,
|
||||
aux: &MetricsAuxInfo,
|
||||
|
||||
139
test_runner/regress/test_sni_router.py
Normal file
139
test_runner/regress/test_sni_router.py
Normal file
@@ -0,0 +1,139 @@
|
||||
import socket
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
|
||||
|
||||
import backoff # type: ignore
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres
|
||||
|
||||
|
||||
def generate_tls_cert(cn, certout, keyout):
|
||||
subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"req",
|
||||
"-new",
|
||||
"-x509",
|
||||
"-days",
|
||||
"365",
|
||||
"-nodes",
|
||||
"-out",
|
||||
certout,
|
||||
"-keyout",
|
||||
keyout,
|
||||
"-subj",
|
||||
f"/CN={cn}",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class PgSniRouter(PgProtocol):
|
||||
def __init__(
|
||||
self,
|
||||
neon_binpath: Path,
|
||||
port: int,
|
||||
destination: str,
|
||||
destination_port: int,
|
||||
tls_cert: Path,
|
||||
tls_key: Path,
|
||||
):
|
||||
# Must use a hostname rather than IP here, for SNI to work
|
||||
host = "localhost"
|
||||
super().__init__(host=host, port=port)
|
||||
|
||||
self.host = host
|
||||
self.neon_binpath = neon_binpath
|
||||
self.port = port
|
||||
self.destination = destination
|
||||
self.destination_port = destination_port
|
||||
self.tls_cert = tls_cert
|
||||
self.tls_key = tls_key
|
||||
self._popen: Optional[subprocess.Popen[bytes]] = None
|
||||
|
||||
def start(self) -> "PgSniRouter":
|
||||
assert self._popen is None
|
||||
args = [
|
||||
str(self.neon_binpath / "pg_sni_router"),
|
||||
*["--listen", f"127.0.0.1:{self.port}"],
|
||||
*["--tls-cert", self.tls_cert],
|
||||
*["--tls-key", self.tls_key],
|
||||
*["--destination", self.destination],
|
||||
*["--destination-port", str(self.destination_port)],
|
||||
]
|
||||
|
||||
self._popen = subprocess.Popen(args)
|
||||
self._wait_until_ready()
|
||||
return self
|
||||
|
||||
@backoff.on_exception(backoff.expo, OSError, max_time=10)
|
||||
def _wait_until_ready(self):
|
||||
socket.create_connection((self.host, self.port))
|
||||
|
||||
# Sends SIGTERM to the proxy if it has been started
|
||||
def terminate(self):
|
||||
if self._popen:
|
||||
self._popen.terminate()
|
||||
|
||||
# Waits for proxy to exit if it has been opened with a default timeout of
|
||||
# two seconds. Raises subprocess.TimeoutExpired if the proxy does not exit in time.
|
||||
def wait_for_exit(self, timeout=2):
|
||||
if self._popen:
|
||||
self._popen.wait(timeout=2)
|
||||
|
||||
def __enter__(self) -> "PgSniRouter":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc: Optional[BaseException],
|
||||
tb: Optional[TracebackType],
|
||||
):
|
||||
if self._popen is not None:
|
||||
self._popen.terminate()
|
||||
try:
|
||||
self._popen.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
log.warn("failed to gracefully terminate pg_sni_router; killing")
|
||||
self._popen.kill()
|
||||
|
||||
|
||||
def test_pg_sni_router(
|
||||
vanilla_pg: VanillaPostgres,
|
||||
port_distributor: PortDistributor,
|
||||
neon_binpath: Path,
|
||||
test_output_dir: Path,
|
||||
):
|
||||
|
||||
generate_tls_cert(
|
||||
"external.test", test_output_dir / "router.crt", test_output_dir / "router.key"
|
||||
)
|
||||
|
||||
# Start a stand-alone Postgres to test with
|
||||
vanilla_pg.start()
|
||||
pg_port = vanilla_pg.default_options["port"]
|
||||
|
||||
router_port = port_distributor.get_port()
|
||||
|
||||
with PgSniRouter(
|
||||
neon_binpath=neon_binpath,
|
||||
port=router_port,
|
||||
destination="localhost",
|
||||
destination_port=pg_port,
|
||||
tls_cert=test_output_dir / "router.crt",
|
||||
tls_key=test_output_dir / "router.key",
|
||||
) as router:
|
||||
router.start()
|
||||
|
||||
out = router.safe_psql(
|
||||
"select 1",
|
||||
dbname="postgres",
|
||||
sslmode="require",
|
||||
host="localhost.external.test",
|
||||
hostaddr="127.0.0.1",
|
||||
)
|
||||
assert out[0][0] == 1
|
||||
Reference in New Issue
Block a user