Files
neon/libs/proxy/tokio-postgres2/src/statement.rs
Conrad Ludgate 6768a71c86 proxy(tokio-postgres): refactor typeinfo query to occur earlier (#11993)
## Problem

For #11992 I realised we need to get the type info before executing the
query. This is important to know how to decode rows with custom types,
eg the following query:

```sql
CREATE TYPE foo AS ENUM ('foo','bar','baz');
SELECT ARRAY['foo'::foo, 'bar'::foo, 'baz'::foo] AS data;
```

Getting that to work was harder that it seems. The original
tokio-postgres setup has a split between `Client` and `Connection`,
where messages are passed between. Because multiple clients were
supported, each client message included a dedicated response channel.
Each request would be terminated by the `ReadyForQuery` message.

The flow I opted to use for parsing types early would not trigger a
`ReadyForQuery`. The flow is as follows:

```
PARSE ""    // parse the user provided query
DESCRIBE "" // describe the query, returning param/result type oids
FLUSH       // force postgres to flush the responses early

// wait for descriptions

  // check if we know the types, if we don't then
  // setup the typeinfo query and execute it against each OID:

  PARSE typeinfo    // prepare our typeinfo query
  DESCRIBE typeinfo
  FLUSH // force postgres to flush the responses early

  // wait for typeinfo statement

    // for each OID we don't know:
    BIND typeinfo
    EXECUTE
    FLUSH

    // wait for type info, might reveal more OIDs to inspect

  // close the typeinfo query, we cache the OID->type map and this is kinder to pgbouncer.
  CLOSE typeinfo 

// finally once we know all the OIDs:
BIND ""   // bind the user provided query - already parsed - to the user provided params
EXECUTE   // run the user provided query
SYNC      // commit the transaction
```

## Summary of changes

Please review commit by commit. The main challenge was allowing one
query to issue multiple sub-queries. To do this I first made sure that
the client could fully own the connection, which required removing any
shared client state. I then had to replace the way responses are sent to
the client, by using only a single permanent channel. This required some
additional effort to track which query is being processed. Lastly I had
to modify the query/typeinfo functions to not issue `sync` commands, so
it would fit into the desired flow above.

To note: the flow above does force an extra roundtrip into each query. I
don't know yet if this has a measurable latency overhead.
2025-05-23 19:41:12 +00:00

113 lines
2.6 KiB
Rust

use std::fmt;
use std::sync::Arc;
use crate::types::Type;
use postgres_protocol2::Oid;
use postgres_protocol2::message::backend::Field;
struct StatementInner {
name: &'static str,
columns: Vec<Column>,
}
/// A prepared statement.
///
/// Prepared statements can only be used with the connection that created them.
#[derive(Clone)]
pub struct Statement(Arc<StatementInner>);
impl Statement {
pub(crate) fn new(name: &'static str, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner { name, columns }))
}
pub(crate) fn name(&self) -> &str {
self.0.name
}
/// Returns information about the columns returned when the statement is queried.
pub fn columns(&self) -> &[Column] {
&self.0.columns
}
}
/// Information about a column of a query.
pub struct Column {
name: String,
pub(crate) type_: Type,
// raw fields from RowDescription
table_oid: Oid,
column_id: i16,
format: i16,
// that better be stored in self.type_, but that is more radical refactoring
type_oid: Oid,
type_size: i16,
type_modifier: i32,
}
impl Column {
pub(crate) fn new(name: String, type_: Type, raw_field: Field<'_>) -> Column {
Column {
name,
type_,
table_oid: raw_field.table_oid(),
column_id: raw_field.column_id(),
format: raw_field.format(),
type_oid: raw_field.type_oid(),
type_size: raw_field.type_size(),
type_modifier: raw_field.type_modifier(),
}
}
/// Returns the name of the column.
pub fn name(&self) -> &str {
&self.name
}
/// Returns the type of the column.
pub fn type_(&self) -> &Type {
&self.type_
}
/// Returns the table OID of the column.
pub fn table_oid(&self) -> Oid {
self.table_oid
}
/// Returns the column ID of the column.
pub fn column_id(&self) -> i16 {
self.column_id
}
/// Returns the format of the column.
pub fn format(&self) -> i16 {
self.format
}
/// Returns the type OID of the column.
pub fn type_oid(&self) -> Oid {
self.type_oid
}
/// Returns the type size of the column.
pub fn type_size(&self) -> i16 {
self.type_size
}
/// Returns the type modifier of the column.
pub fn type_modifier(&self) -> i32 {
self.type_modifier
}
}
impl fmt::Debug for Column {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Column")
.field("name", &self.name)
.field("type", &self.type_)
.finish()
}
}