Files
neon/libs/proxy/tokio-postgres2/src/connect.rs
Conrad Ludgate 6768a71c86 proxy(tokio-postgres): refactor typeinfo query to occur earlier (#11993)
## Problem

For #11992 I realised we need to get the type info before executing the
query. This is important to know how to decode rows with custom types,
eg the following query:

```sql
CREATE TYPE foo AS ENUM ('foo','bar','baz');
SELECT ARRAY['foo'::foo, 'bar'::foo, 'baz'::foo] AS data;
```

Getting that to work was harder that it seems. The original
tokio-postgres setup has a split between `Client` and `Connection`,
where messages are passed between. Because multiple clients were
supported, each client message included a dedicated response channel.
Each request would be terminated by the `ReadyForQuery` message.

The flow I opted to use for parsing types early would not trigger a
`ReadyForQuery`. The flow is as follows:

```
PARSE ""    // parse the user provided query
DESCRIBE "" // describe the query, returning param/result type oids
FLUSH       // force postgres to flush the responses early

// wait for descriptions

  // check if we know the types, if we don't then
  // setup the typeinfo query and execute it against each OID:

  PARSE typeinfo    // prepare our typeinfo query
  DESCRIBE typeinfo
  FLUSH // force postgres to flush the responses early

  // wait for typeinfo statement

    // for each OID we don't know:
    BIND typeinfo
    EXECUTE
    FLUSH

    // wait for type info, might reveal more OIDs to inspect

  // close the typeinfo query, we cache the OID->type map and this is kinder to pgbouncer.
  CLOSE typeinfo 

// finally once we know all the OIDs:
BIND ""   // bind the user provided query - already parsed - to the user provided params
EXECUTE   // run the user provided query
SYNC      // commit the transaction
```

## Summary of changes

Please review commit by commit. The main challenge was allowing one
query to issue multiple sub-queries. To do this I first made sure that
the client could fully own the connection, which required removing any
shared client state. I then had to replace the way responses are sent to
the client, by using only a single permanent channel. This required some
additional effort to track which query is being processed. Lastly I had
to modify the query/typeinfo functions to not issue `sync` commands, so
it would fit into the desired flow above.

To note: the flow above does force an extra roundtrip into each query. I
don't know yet if this has a measurable latency overhead.
2025-05-23 19:41:12 +00:00

83 lines
2.1 KiB
Rust

use std::net::IpAddr;
use postgres_protocol2::message::backend::Message;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use crate::client::SocketConfig;
use crate::codec::BackendMessage;
use crate::config::Host;
use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
pub async fn connect<T>(
mut tls: T,
config: &Config,
) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
where
T: MakeTlsConnect<TcpStream>,
{
let hostname = match &config.host {
Host::Tcp(host) => host.as_str(),
};
let tls = tls
.make_tls_connect(hostname)
.map_err(|e| Error::tls(e.into()))?;
match connect_once(config.host_addr, &config.host, config.port, tls, config).await {
Ok((client, connection)) => Ok((client, connection)),
Err(e) => Err(e),
}
}
async fn connect_once<T>(
host_addr: Option<IpAddr>,
host: &Host,
port: u16,
tls: T,
config: &Config,
) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
where
T: TlsConnect<TcpStream>,
{
let socket = connect_socket(host_addr, host, port, config.connect_timeout).await?;
let RawConnection {
stream,
parameters,
delayed_notice,
process_id,
secret_key,
} = connect_raw(socket, tls, config).await?;
let socket_config = SocketConfig {
host_addr,
host: host.clone(),
port,
connect_timeout: config.connect_timeout,
};
let (client_tx, conn_rx) = mpsc::unbounded_channel();
let (conn_tx, client_rx) = mpsc::channel(4);
let client = Client::new(
client_tx,
client_rx,
socket_config,
config.ssl_mode,
process_id,
secret_key,
);
// delayed notices are always sent as "Async" messages.
let delayed = delayed_notice
.into_iter()
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
.collect();
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
Ok((client, connection))
}