mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 06:00:38 +00:00
chore(proxy): remove postgres config parser and md5 support (#9990)
Keeping the `mock` postgres cplane adaptor using "stock" tokio-postgres allows us to remove a lot of dead weight from our actual postgres connection logic.
This commit is contained in:
committed by
Ivan Efremov
parent
f7d5322e8b
commit
60241127e2
@@ -66,7 +66,7 @@ pub(super) async fn authenticate(
|
||||
|
||||
Ok(ComputeCredentials {
|
||||
info: creds,
|
||||
keys: ComputeCredentialKeys::AuthKeys(tokio_postgres::config::AuthKeys::ScramSha256(
|
||||
keys: ComputeCredentialKeys::AuthKeys(postgres_client::config::AuthKeys::ScramSha256(
|
||||
scram_keys,
|
||||
)),
|
||||
})
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use async_trait::async_trait;
|
||||
use postgres_client::config::SslMode;
|
||||
use pq_proto::BeMessage as Be;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
use super::ComputeCredentialKeys;
|
||||
|
||||
@@ -11,8 +11,8 @@ pub use console_redirect::ConsoleRedirectBackend;
|
||||
pub(crate) use console_redirect::ConsoleRedirectError;
|
||||
use ipnet::{Ipv4Net, Ipv6Net};
|
||||
use local::LocalBackend;
|
||||
use postgres_client::config::AuthKeys;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_postgres::config::AuthKeys;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::auth::credentials::check_peer_addr_is_in_list;
|
||||
|
||||
@@ -227,7 +227,7 @@ pub(crate) async fn validate_password_and_exchange(
|
||||
};
|
||||
|
||||
Ok(sasl::Outcome::Success(ComputeCredentialKeys::AuthKeys(
|
||||
tokio_postgres::config::AuthKeys::ScramSha256(keys),
|
||||
postgres_client::config::AuthKeys::ScramSha256(keys),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,11 +3,11 @@ use std::sync::Arc;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
use postgres_client::{CancelToken, NoTls};
|
||||
use pq_proto::CancelKeyData;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_postgres::{CancelToken, NoTls};
|
||||
use tracing::{debug, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -44,7 +44,7 @@ pub(crate) enum CancelError {
|
||||
IO(#[from] std::io::Error),
|
||||
|
||||
#[error("{0}")]
|
||||
Postgres(#[from] tokio_postgres::Error),
|
||||
Postgres(#[from] postgres_client::Error),
|
||||
|
||||
#[error("rate limit exceeded")]
|
||||
RateLimit,
|
||||
@@ -70,7 +70,7 @@ impl ReportableError for CancelError {
|
||||
impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
/// Run async action within an ephemeral session identified by [`CancelKeyData`].
|
||||
pub(crate) fn get_session(self: Arc<Self>) -> Session<P> {
|
||||
// HACK: We'd rather get the real backend_pid but tokio_postgres doesn't
|
||||
// HACK: We'd rather get the real backend_pid but postgres_client doesn't
|
||||
// expose it and we don't want to do another roundtrip to query
|
||||
// for it. The client will be able to notice that this is not the
|
||||
// actual backend_pid, but backend_pid is not used for anything
|
||||
|
||||
@@ -6,6 +6,8 @@ use std::time::Duration;
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use postgres_client::tls::MakeTlsConnect;
|
||||
use postgres_client::{CancelToken, RawConnection};
|
||||
use postgres_protocol::message::backend::NoticeResponseBody;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use rustls::client::danger::ServerCertVerifier;
|
||||
@@ -13,8 +15,6 @@ use rustls::crypto::ring;
|
||||
use rustls::pki_types::InvalidDnsNameError;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_postgres::tls::MakeTlsConnect;
|
||||
use tokio_postgres::{CancelToken, RawConnection};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::auth::parse_endpoint_param;
|
||||
@@ -34,9 +34,9 @@ pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
|
||||
#[derive(Debug, Error)]
|
||||
pub(crate) enum ConnectionError {
|
||||
/// This error doesn't seem to reveal any secrets; for instance,
|
||||
/// `tokio_postgres::error::Kind` doesn't contain ip addresses and such.
|
||||
/// `postgres_client::error::Kind` doesn't contain ip addresses and such.
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
Postgres(#[from] tokio_postgres::Error),
|
||||
Postgres(#[from] postgres_client::Error),
|
||||
|
||||
#[error("{COULD_NOT_CONNECT}: {0}")]
|
||||
CouldNotConnect(#[from] io::Error),
|
||||
@@ -99,13 +99,13 @@ impl ReportableError for ConnectionError {
|
||||
}
|
||||
|
||||
/// A pair of `ClientKey` & `ServerKey` for `SCRAM-SHA-256`.
|
||||
pub(crate) type ScramKeys = tokio_postgres::config::ScramKeys<32>;
|
||||
pub(crate) type ScramKeys = postgres_client::config::ScramKeys<32>;
|
||||
|
||||
/// A config for establishing a connection to compute node.
|
||||
/// Eventually, `tokio_postgres` will be replaced with something better.
|
||||
/// Eventually, `postgres_client` will be replaced with something better.
|
||||
/// Newtype allows us to implement methods on top of it.
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct ConnCfg(Box<tokio_postgres::Config>);
|
||||
pub(crate) struct ConnCfg(Box<postgres_client::Config>);
|
||||
|
||||
/// Creation and initialization routines.
|
||||
impl ConnCfg {
|
||||
@@ -126,7 +126,7 @@ impl ConnCfg {
|
||||
|
||||
pub(crate) fn get_host(&self) -> Result<Host, WakeComputeError> {
|
||||
match self.0.get_hosts() {
|
||||
[tokio_postgres::config::Host::Tcp(s)] => Ok(s.into()),
|
||||
[postgres_client::config::Host::Tcp(s)] => Ok(s.into()),
|
||||
// we should not have multiple address or unix addresses.
|
||||
_ => Err(WakeComputeError::BadComputeAddress(
|
||||
"invalid compute address".into(),
|
||||
@@ -160,7 +160,7 @@ impl ConnCfg {
|
||||
|
||||
// TODO: This is especially ugly...
|
||||
if let Some(replication) = params.get("replication") {
|
||||
use tokio_postgres::config::ReplicationMode;
|
||||
use postgres_client::config::ReplicationMode;
|
||||
match replication {
|
||||
"true" | "on" | "yes" | "1" => {
|
||||
self.replication_mode(ReplicationMode::Physical);
|
||||
@@ -182,7 +182,7 @@ impl ConnCfg {
|
||||
}
|
||||
|
||||
impl std::ops::Deref for ConnCfg {
|
||||
type Target = tokio_postgres::Config;
|
||||
type Target = postgres_client::Config;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
@@ -199,7 +199,7 @@ impl std::ops::DerefMut for ConnCfg {
|
||||
impl ConnCfg {
|
||||
/// Establish a raw TCP connection to the compute node.
|
||||
async fn connect_raw(&self, timeout: Duration) -> io::Result<(SocketAddr, TcpStream, &str)> {
|
||||
use tokio_postgres::config::Host;
|
||||
use postgres_client::config::Host;
|
||||
|
||||
// wrap TcpStream::connect with timeout
|
||||
let connect_with_timeout = |host, port| {
|
||||
@@ -224,7 +224,7 @@ impl ConnCfg {
|
||||
})
|
||||
};
|
||||
|
||||
// We can't reuse connection establishing logic from `tokio_postgres` here,
|
||||
// We can't reuse connection establishing logic from `postgres_client` here,
|
||||
// because it has no means for extracting the underlying socket which we
|
||||
// require for our business.
|
||||
let mut connection_error = None;
|
||||
@@ -272,7 +272,7 @@ type RustlsStream = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>
|
||||
pub(crate) struct PostgresConnection {
|
||||
/// Socket connected to a compute node.
|
||||
pub(crate) stream:
|
||||
tokio_postgres::maybe_tls_stream::MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
|
||||
postgres_client::maybe_tls_stream::MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
|
||||
/// PostgreSQL connection parameters.
|
||||
pub(crate) params: std::collections::HashMap<String, String>,
|
||||
/// Query cancellation token.
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::sync::Arc;
|
||||
|
||||
use futures::TryFutureExt;
|
||||
use thiserror::Error;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::Client;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
|
||||
@@ -165,7 +164,7 @@ impl MockControlPlane {
|
||||
config
|
||||
.host(self.endpoint.host_str().unwrap_or("localhost"))
|
||||
.port(self.endpoint.port().unwrap_or(5432))
|
||||
.ssl_mode(SslMode::Disable);
|
||||
.ssl_mode(postgres_client::config::SslMode::Disable);
|
||||
|
||||
let node = NodeInfo {
|
||||
config,
|
||||
|
||||
@@ -6,8 +6,8 @@ use std::time::Duration;
|
||||
use ::http::header::AUTHORIZATION;
|
||||
use ::http::HeaderName;
|
||||
use futures::TryFutureExt;
|
||||
use postgres_client::config::SslMode;
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use super::super::messages::{ControlPlaneErrorMessage, GetRoleSecret, WakeCompute};
|
||||
|
||||
@@ -84,7 +84,7 @@ pub(crate) trait ReportableError: fmt::Display + Send + 'static {
|
||||
fn get_error_kind(&self) -> ErrorKind;
|
||||
}
|
||||
|
||||
impl ReportableError for tokio_postgres::error::Error {
|
||||
impl ReportableError for postgres_client::error::Error {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
if self.as_db_error().is_some() {
|
||||
ErrorKind::Postgres
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
|
||||
use postgres_client::tls::MakeTlsConnect;
|
||||
use rustls::pki_types::ServerName;
|
||||
use rustls::ClientConfig;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_postgres::tls::MakeTlsConnect;
|
||||
|
||||
mod private {
|
||||
use std::future::Future;
|
||||
@@ -12,9 +12,9 @@ mod private {
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use postgres_client::tls::{ChannelBinding, TlsConnect};
|
||||
use rustls::pki_types::ServerName;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio_postgres::tls::{ChannelBinding, TlsConnect};
|
||||
use tokio_rustls::client::TlsStream;
|
||||
use tokio_rustls::TlsConnector;
|
||||
|
||||
@@ -59,7 +59,7 @@ mod private {
|
||||
|
||||
pub struct RustlsStream<S>(TlsStream<S>);
|
||||
|
||||
impl<S> tokio_postgres::tls::TlsStream for RustlsStream<S>
|
||||
impl<S> postgres_client::tls::TlsStream for RustlsStream<S>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
|
||||
@@ -31,9 +31,9 @@ impl CouldRetry for io::Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl CouldRetry for tokio_postgres::error::DbError {
|
||||
impl CouldRetry for postgres_client::error::DbError {
|
||||
fn could_retry(&self) -> bool {
|
||||
use tokio_postgres::error::SqlState;
|
||||
use postgres_client::error::SqlState;
|
||||
matches!(
|
||||
self.code(),
|
||||
&SqlState::CONNECTION_FAILURE
|
||||
@@ -43,9 +43,9 @@ impl CouldRetry for tokio_postgres::error::DbError {
|
||||
)
|
||||
}
|
||||
}
|
||||
impl ShouldRetryWakeCompute for tokio_postgres::error::DbError {
|
||||
impl ShouldRetryWakeCompute for postgres_client::error::DbError {
|
||||
fn should_retry_wake_compute(&self) -> bool {
|
||||
use tokio_postgres::error::SqlState;
|
||||
use postgres_client::error::SqlState;
|
||||
// Here are errors that happens after the user successfully authenticated to the database.
|
||||
// TODO: there are pgbouncer errors that should be retried, but they are not listed here.
|
||||
!matches!(
|
||||
@@ -61,21 +61,21 @@ impl ShouldRetryWakeCompute for tokio_postgres::error::DbError {
|
||||
}
|
||||
}
|
||||
|
||||
impl CouldRetry for tokio_postgres::Error {
|
||||
impl CouldRetry for postgres_client::Error {
|
||||
fn could_retry(&self) -> bool {
|
||||
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
io::Error::could_retry(io_err)
|
||||
} else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
tokio_postgres::error::DbError::could_retry(db_err)
|
||||
postgres_client::error::DbError::could_retry(db_err)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
impl ShouldRetryWakeCompute for tokio_postgres::Error {
|
||||
impl ShouldRetryWakeCompute for postgres_client::Error {
|
||||
fn should_retry_wake_compute(&self) -> bool {
|
||||
if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
tokio_postgres::error::DbError::should_retry_wake_compute(db_err)
|
||||
postgres_client::error::DbError::should_retry_wake_compute(db_err)
|
||||
} else {
|
||||
// likely an IO error. Possible the compute has shutdown and the
|
||||
// cache is stale.
|
||||
|
||||
@@ -8,9 +8,9 @@ use std::fmt::Debug;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use postgres_client::tls::TlsConnect;
|
||||
use postgres_protocol::message::frontend;
|
||||
use tokio::io::{AsyncReadExt, DuplexStream};
|
||||
use tokio_postgres::tls::TlsConnect;
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
use super::*;
|
||||
@@ -158,8 +158,8 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
|
||||
Scram::new("password").await?,
|
||||
));
|
||||
|
||||
let _client_err = tokio_postgres::Config::new()
|
||||
.channel_binding(tokio_postgres::config::ChannelBinding::Disable)
|
||||
let _client_err = postgres_client::Config::new()
|
||||
.channel_binding(postgres_client::config::ChannelBinding::Disable)
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
.password("password")
|
||||
@@ -175,7 +175,7 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
|
||||
async fn scram_auth_prefer_channel_binding() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::None,
|
||||
tokio_postgres::config::ChannelBinding::Prefer,
|
||||
postgres_client::config::ChannelBinding::Prefer,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -185,7 +185,7 @@ async fn scram_auth_prefer_channel_binding() -> anyhow::Result<()> {
|
||||
async fn scram_auth_prefer_channel_binding_intercept() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::Methods,
|
||||
tokio_postgres::config::ChannelBinding::Prefer,
|
||||
postgres_client::config::ChannelBinding::Prefer,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -195,7 +195,7 @@ async fn scram_auth_prefer_channel_binding_intercept() -> anyhow::Result<()> {
|
||||
async fn scram_auth_prefer_channel_binding_intercept_response() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::SASLResponse,
|
||||
tokio_postgres::config::ChannelBinding::Prefer,
|
||||
postgres_client::config::ChannelBinding::Prefer,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -205,7 +205,7 @@ async fn scram_auth_prefer_channel_binding_intercept_response() -> anyhow::Resul
|
||||
async fn scram_auth_require_channel_binding() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::None,
|
||||
tokio_postgres::config::ChannelBinding::Require,
|
||||
postgres_client::config::ChannelBinding::Require,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -215,7 +215,7 @@ async fn scram_auth_require_channel_binding() -> anyhow::Result<()> {
|
||||
async fn scram_auth_require_channel_binding_intercept() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::Methods,
|
||||
tokio_postgres::config::ChannelBinding::Require,
|
||||
postgres_client::config::ChannelBinding::Require,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -225,14 +225,14 @@ async fn scram_auth_require_channel_binding_intercept() -> anyhow::Result<()> {
|
||||
async fn scram_auth_require_channel_binding_intercept_response() -> anyhow::Result<()> {
|
||||
connect_failure(
|
||||
Intercept::SASLResponse,
|
||||
tokio_postgres::config::ChannelBinding::Require,
|
||||
postgres_client::config::ChannelBinding::Require,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn connect_failure(
|
||||
intercept: Intercept,
|
||||
channel_binding: tokio_postgres::config::ChannelBinding,
|
||||
channel_binding: postgres_client::config::ChannelBinding,
|
||||
) -> anyhow::Result<()> {
|
||||
let (server, client, client_config, server_config) = proxy_mitm(intercept).await;
|
||||
let proxy = tokio::spawn(dummy_proxy(
|
||||
@@ -241,7 +241,7 @@ async fn connect_failure(
|
||||
Scram::new("password").await?,
|
||||
));
|
||||
|
||||
let _client_err = tokio_postgres::Config::new()
|
||||
let _client_err = postgres_client::Config::new()
|
||||
.channel_binding(channel_binding)
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
|
||||
@@ -7,13 +7,13 @@ use std::time::Duration;
|
||||
use anyhow::{bail, Context};
|
||||
use async_trait::async_trait;
|
||||
use http::StatusCode;
|
||||
use postgres_client::config::SslMode;
|
||||
use postgres_client::tls::{MakeTlsConnect, NoTls};
|
||||
use retry::{retry_after, ShouldRetryWakeCompute};
|
||||
use rstest::rstest;
|
||||
use rustls::crypto::ring;
|
||||
use rustls::pki_types;
|
||||
use tokio::io::DuplexStream;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
|
||||
|
||||
use super::connect_compute::ConnectMechanism;
|
||||
use super::retry::CouldRetry;
|
||||
@@ -204,7 +204,7 @@ async fn handshake_tls_is_enforced_by_proxy() -> anyhow::Result<()> {
|
||||
let (_, server_config) = generate_tls_config("generic-project-name.localhost", "localhost")?;
|
||||
let proxy = tokio::spawn(dummy_proxy(client, Some(server_config), NoAuth));
|
||||
|
||||
let client_err = tokio_postgres::Config::new()
|
||||
let client_err = postgres_client::Config::new()
|
||||
.user("john_doe")
|
||||
.dbname("earth")
|
||||
.ssl_mode(SslMode::Disable)
|
||||
@@ -233,7 +233,7 @@ async fn handshake_tls() -> anyhow::Result<()> {
|
||||
generate_tls_config("generic-project-name.localhost", "localhost")?;
|
||||
let proxy = tokio::spawn(dummy_proxy(client, Some(server_config), NoAuth));
|
||||
|
||||
let _conn = tokio_postgres::Config::new()
|
||||
let _conn = postgres_client::Config::new()
|
||||
.user("john_doe")
|
||||
.dbname("earth")
|
||||
.ssl_mode(SslMode::Require)
|
||||
@@ -249,7 +249,7 @@ async fn handshake_raw() -> anyhow::Result<()> {
|
||||
|
||||
let proxy = tokio::spawn(dummy_proxy(client, None, NoAuth));
|
||||
|
||||
let _conn = tokio_postgres::Config::new()
|
||||
let _conn = postgres_client::Config::new()
|
||||
.user("john_doe")
|
||||
.dbname("earth")
|
||||
.options("project=generic-project-name")
|
||||
@@ -296,8 +296,8 @@ async fn scram_auth_good(#[case] password: &str) -> anyhow::Result<()> {
|
||||
Scram::new(password).await?,
|
||||
));
|
||||
|
||||
let _conn = tokio_postgres::Config::new()
|
||||
.channel_binding(tokio_postgres::config::ChannelBinding::Require)
|
||||
let _conn = postgres_client::Config::new()
|
||||
.channel_binding(postgres_client::config::ChannelBinding::Require)
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
.password(password)
|
||||
@@ -320,8 +320,8 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
|
||||
Scram::new("password").await?,
|
||||
));
|
||||
|
||||
let _conn = tokio_postgres::Config::new()
|
||||
.channel_binding(tokio_postgres::config::ChannelBinding::Disable)
|
||||
let _conn = postgres_client::Config::new()
|
||||
.channel_binding(postgres_client::config::ChannelBinding::Disable)
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
.password("password")
|
||||
@@ -348,7 +348,7 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
|
||||
.map(char::from)
|
||||
.collect();
|
||||
|
||||
let _client_err = tokio_postgres::Config::new()
|
||||
let _client_err = postgres_client::Config::new()
|
||||
.user("user")
|
||||
.dbname("db")
|
||||
.password(&password) // no password will match the mocked secret
|
||||
|
||||
@@ -37,9 +37,9 @@ use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
|
||||
|
||||
pub(crate) struct PoolingBackend {
|
||||
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
|
||||
pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
|
||||
pub(crate) pool:
|
||||
Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
|
||||
Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
|
||||
|
||||
pub(crate) config: &'static ProxyConfig,
|
||||
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
@@ -170,7 +170,7 @@ impl PoolingBackend {
|
||||
conn_info: ConnInfo,
|
||||
keys: ComputeCredentials,
|
||||
force_new: bool,
|
||||
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
|
||||
) -> Result<Client<postgres_client::Client>, HttpConnError> {
|
||||
let maybe_client = if force_new {
|
||||
debug!("pool: pool is disabled");
|
||||
None
|
||||
@@ -256,7 +256,7 @@ impl PoolingBackend {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
|
||||
) -> Result<Client<postgres_client::Client>, HttpConnError> {
|
||||
if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
|
||||
return Ok(client);
|
||||
}
|
||||
@@ -315,7 +315,7 @@ impl PoolingBackend {
|
||||
));
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
|
||||
let (client, connection) = config.connect(postgres_client::NoTls).await?;
|
||||
drop(pause);
|
||||
|
||||
let pid = client.get_process_id();
|
||||
@@ -360,7 +360,7 @@ pub(crate) enum HttpConnError {
|
||||
#[error("pooled connection closed at inconsistent state")]
|
||||
ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
|
||||
#[error("could not connection to postgres in compute")]
|
||||
PostgresConnectionError(#[from] tokio_postgres::Error),
|
||||
PostgresConnectionError(#[from] postgres_client::Error),
|
||||
#[error("could not connection to local-proxy in compute")]
|
||||
LocalProxyConnectionError(#[from] LocalProxyConnError),
|
||||
#[error("could not parse JWT payload")]
|
||||
@@ -479,7 +479,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
|
||||
}
|
||||
|
||||
struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
|
||||
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
@@ -489,7 +489,7 @@ struct TokioMechanism {
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for TokioMechanism {
|
||||
type Connection = Client<tokio_postgres::Client>;
|
||||
type Connection = Client<postgres_client::Client>;
|
||||
type ConnectError = HttpConnError;
|
||||
type Error = HttpConnError;
|
||||
|
||||
@@ -509,7 +509,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
.connect_timeout(timeout);
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let res = config.connect(tokio_postgres::NoTls).await;
|
||||
let res = config.connect(postgres_client::NoTls).await;
|
||||
drop(pause);
|
||||
let (client, connection) = permit.release_result(res)?;
|
||||
|
||||
|
||||
@@ -5,11 +5,11 @@ use std::task::{ready, Poll};
|
||||
|
||||
use futures::future::poll_fn;
|
||||
use futures::Future;
|
||||
use postgres_client::tls::NoTlsStream;
|
||||
use postgres_client::AsyncMessage;
|
||||
use smallvec::SmallVec;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::tls::NoTlsStream;
|
||||
use tokio_postgres::AsyncMessage;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
#[cfg(test)]
|
||||
@@ -58,7 +58,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
client: C,
|
||||
mut connection: tokio_postgres::Connection<TcpStream, NoTlsStream>,
|
||||
mut connection: postgres_client::Connection<TcpStream, NoTlsStream>,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<C> {
|
||||
|
||||
@@ -7,8 +7,8 @@ use std::time::Duration;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use parking_lot::RwLock;
|
||||
use postgres_client::ReadyForQueryStatus;
|
||||
use rand::Rng;
|
||||
use tokio_postgres::ReadyForQueryStatus;
|
||||
use tracing::{debug, info, Span};
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
@@ -683,7 +683,7 @@ pub(crate) trait ClientInnerExt: Sync + Send + 'static {
|
||||
fn get_process_id(&self) -> i32;
|
||||
}
|
||||
|
||||
impl ClientInnerExt for tokio_postgres::Client {
|
||||
impl ClientInnerExt for postgres_client::Client {
|
||||
fn is_closed(&self) -> bool {
|
||||
self.is_closed()
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use postgres_client::types::{Kind, Type};
|
||||
use postgres_client::Row;
|
||||
use serde_json::{Map, Value};
|
||||
use tokio_postgres::types::{Kind, Type};
|
||||
use tokio_postgres::Row;
|
||||
|
||||
//
|
||||
// Convert json non-string types to strings, so that they can be passed to Postgres
|
||||
@@ -61,7 +61,7 @@ fn json_array_to_pg_array(value: &Value) -> Option<String> {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum JsonConversionError {
|
||||
#[error("internal error compute returned invalid data: {0}")]
|
||||
AsTextError(tokio_postgres::Error),
|
||||
AsTextError(postgres_client::Error),
|
||||
#[error("parse int error: {0}")]
|
||||
ParseIntError(#[from] std::num::ParseIntError),
|
||||
#[error("parse float error: {0}")]
|
||||
|
||||
@@ -22,13 +22,13 @@ use indexmap::IndexMap;
|
||||
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
|
||||
use p256::ecdsa::{Signature, SigningKey};
|
||||
use parking_lot::RwLock;
|
||||
use postgres_client::tls::NoTlsStream;
|
||||
use postgres_client::types::ToSql;
|
||||
use postgres_client::AsyncMessage;
|
||||
use serde_json::value::RawValue;
|
||||
use signature::Signer;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::tls::NoTlsStream;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::AsyncMessage;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||
|
||||
@@ -164,7 +164,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
client: C,
|
||||
mut connection: tokio_postgres::Connection<TcpStream, NoTlsStream>,
|
||||
mut connection: postgres_client::Connection<TcpStream, NoTlsStream>,
|
||||
key: SigningKey,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
@@ -280,7 +280,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
)
|
||||
}
|
||||
|
||||
impl ClientInnerCommon<tokio_postgres::Client> {
|
||||
impl ClientInnerCommon<postgres_client::Client> {
|
||||
pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> {
|
||||
if let ClientDataEnum::Local(local_data) = &mut self.data {
|
||||
local_data.jti += 1;
|
||||
|
||||
@@ -11,12 +11,12 @@ use http_body_util::{BodyExt, Full};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::http::{HeaderName, HeaderValue};
|
||||
use hyper::{header, HeaderMap, Request, Response, StatusCode};
|
||||
use postgres_client::error::{DbError, ErrorPosition, SqlState};
|
||||
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use tokio::time::{self, Instant};
|
||||
use tokio_postgres::error::{DbError, ErrorPosition, SqlState};
|
||||
use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info};
|
||||
use typed_json::json;
|
||||
@@ -361,7 +361,7 @@ pub(crate) enum SqlOverHttpError {
|
||||
#[error("invalid isolation level")]
|
||||
InvalidIsolationLevel,
|
||||
#[error("{0}")]
|
||||
Postgres(#[from] tokio_postgres::Error),
|
||||
Postgres(#[from] postgres_client::Error),
|
||||
#[error("{0}")]
|
||||
JsonConversion(#[from] JsonConversionError),
|
||||
#[error("{0}")]
|
||||
@@ -986,7 +986,7 @@ async fn query_to_json<T: GenericClient>(
|
||||
// Manually drain the stream into a vector to leave row_stream hanging
|
||||
// around to get a command tag. Also check that the response is not too
|
||||
// big.
|
||||
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
|
||||
let mut rows: Vec<postgres_client::Row> = Vec::new();
|
||||
while let Some(row) = row_stream.next().await {
|
||||
let row = row?;
|
||||
*current_size += row.body_len();
|
||||
@@ -1063,13 +1063,13 @@ async fn query_to_json<T: GenericClient>(
|
||||
}
|
||||
|
||||
enum Client {
|
||||
Remote(conn_pool_lib::Client<tokio_postgres::Client>),
|
||||
Local(conn_pool_lib::Client<tokio_postgres::Client>),
|
||||
Remote(conn_pool_lib::Client<postgres_client::Client>),
|
||||
Local(conn_pool_lib::Client<postgres_client::Client>),
|
||||
}
|
||||
|
||||
enum Discard<'a> {
|
||||
Remote(conn_pool_lib::Discard<'a, tokio_postgres::Client>),
|
||||
Local(conn_pool_lib::Discard<'a, tokio_postgres::Client>),
|
||||
Remote(conn_pool_lib::Discard<'a, postgres_client::Client>),
|
||||
Local(conn_pool_lib::Discard<'a, postgres_client::Client>),
|
||||
}
|
||||
|
||||
impl Client {
|
||||
@@ -1080,7 +1080,7 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
|
||||
fn inner(&mut self) -> (&mut postgres_client::Client, Discard<'_>) {
|
||||
match self {
|
||||
Client::Remote(client) => {
|
||||
let (c, d) = client.inner();
|
||||
|
||||
Reference in New Issue
Block a user