[proxy] Forward compute connection params to client

This fixes all kinds of problems related to missing params,
like broken timestamps (due to `integer_datetimes`).

This solution is not ideal, but it will help. Meanwhile,
I'm going to dedicate some time to improving connection machinery.

Note that this **does not** fix problems with passing certain parameters
in a reverse direction, i.e. **from client to compute**. This is a
separate matter and will be dealt with in an upcoming PR.
This commit is contained in:
Dmitry Ivanov
2022-12-15 18:10:43 +03:00
parent 64775a0a75
commit 83baf49487
19 changed files with 137 additions and 95 deletions

View File

@@ -1,6 +1,6 @@
use super::{AuthSuccess, NodeInfo};
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
use pq_proto::{BeMessage as Be, BeParameterStatusMessage};
use pq_proto::BeMessage as Be;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span};
@@ -60,7 +60,7 @@ pub async fn handle_user(
info!(parent: &span, "sending the auth URL to the user");
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message_noflush(&Be::CLIENT_ENCODING)?
.write_message(&Be::NoticeResponse(&greeting))
.await?;

View File

@@ -8,18 +8,17 @@ use tokio::net::TcpStream;
use tokio_postgres::NoTls;
use tracing::{error, info};
const COULD_NOT_CONNECT: &str = "Could not connect to compute node";
#[derive(Debug, Error)]
pub enum ConnectionError {
/// This error doesn't seem to reveal any secrets; for instance,
/// [`tokio_postgres::error::Kind`] doesn't contain ip addresses and such.
#[error("Failed to connect to the compute node: {0}")]
#[error("{COULD_NOT_CONNECT}: {0}")]
Postgres(#[from] tokio_postgres::Error),
#[error("Failed to connect to the compute node")]
FailedToConnectToCompute,
#[error("Failed to fetch compute node version")]
FailedToFetchPgVersion,
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
}
impl UserFacingError for ConnectionError {
@@ -29,10 +28,10 @@ impl UserFacingError for ConnectionError {
// This helps us drop irrelevant library-specific prefixes.
// TODO: propagate severity level and other parameters.
Postgres(err) => match err.as_db_error() {
Some(err) => err.message().to_string(),
Some(err) => err.message().to_owned(),
None => err.to_string(),
},
other => other.to_string(),
_ => COULD_NOT_CONNECT.to_owned(),
}
}
}
@@ -49,7 +48,7 @@ pub struct ConnCfg(pub tokio_postgres::Config);
impl ConnCfg {
/// Construct a new connection config.
pub fn new() -> Self {
Self(tokio_postgres::Config::new())
Self(Default::default())
}
}
@@ -95,7 +94,7 @@ impl ConnCfg {
io::ErrorKind::Other,
format!(
"couldn't connect: bad compute config, \
ports and hosts entries' count does not match: {:?}",
ports and hosts entries' count does not match: {:?}",
self.0
),
));
@@ -131,8 +130,8 @@ impl ConnCfg {
pub struct PostgresConnection {
/// Socket connected to a compute node.
pub stream: TcpStream,
/// PostgreSQL version of this instance.
pub version: String,
/// PostgreSQL connection parameters.
pub params: std::collections::HashMap<String, String>,
}
impl ConnCfg {
@@ -156,6 +155,7 @@ impl ConnCfg {
self.0.application_name(app_name);
}
// TODO: This is especially ugly...
if let Some(replication) = params.get("replication") {
use tokio_postgres::config::ReplicationMode;
match replication {
@@ -172,22 +172,24 @@ impl ConnCfg {
// TODO: extend the list of the forwarded startup parameters.
// Currently, tokio-postgres doesn't allow us to pass
// arbitrary parameters, but the ones above are a good start.
//
// This and the reverse params problem can be better addressed
// in a bespoke connection machinery (a new library for that sake).
let (socket_addr, mut stream) = self
.connect_raw()
.await
.map_err(|_| ConnectionError::FailedToConnectToCompute)?;
// TODO: establish a secure connection to the DB
let (client, conn) = self.0.connect_raw(&mut stream, NoTls).await?;
let version = conn
.parameter("server_version")
.ok_or(ConnectionError::FailedToFetchPgVersion)?
.into();
// TODO: establish a secure connection to the DB.
let (socket_addr, mut stream) = self.connect_raw().await?;
let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?;
info!("connected to user's compute node at {socket_addr}");
// This is very ugly but as of now there's no better way to
// extract the connection parameters from tokio-postgres' connection.
// TODO: solve this problem in a more elegant manner (e.g. the new library).
let params = connection.parameters;
// NB: CancelToken is supposed to hold socket_addr, but we use connect_raw.
// Yet another reason to rework the connection establishing code.
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
let db = PostgresConnection { stream, version };
let db = PostgresConnection { stream, params };
Ok((db, cancel_closure))
}

View File

@@ -255,15 +255,21 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
// Note that we do this only (for the most part) after we've connected
// to a compute (see above) which performs its own authentication.
if !auth_result.reported_auth_ok {
stream
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
stream.write_message_noflush(&Be::AuthenticationOk)?;
}
// Forward all postgres connection params to the client.
// Right now the implementation is very hacky and inefficent (ideally,
// we don't need an intermediate hashmap), but at least it should be correct.
for (name, value) in &db.params {
// TODO: Theoretically, this could result in a big pile of params...
stream.write_message_noflush(&Be::ParameterStatus {
name: name.as_bytes(),
value: value.as_bytes(),
})?;
}
stream
.write_message_noflush(&BeMessage::ParameterStatus(
BeParameterStatusMessage::ServerVersion(&db.version),
))?
.write_message_noflush(&Be::BackendKeyData(cancel_key_data))?
.write_message(&BeMessage::ReadyForQuery)
.await?;

View File

@@ -139,7 +139,7 @@ async fn dummy_proxy(
stream
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message_noflush(&Be::CLIENT_ENCODING)?
.write_message(&BeMessage::ReadyForQuery)
.await?;