mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 16:32:56 +00:00
## Problem For #11992 I realised we need to get the type info before executing the query. This is important to know how to decode rows with custom types, eg the following query: ```sql CREATE TYPE foo AS ENUM ('foo','bar','baz'); SELECT ARRAY['foo'::foo, 'bar'::foo, 'baz'::foo] AS data; ``` Getting that to work was harder that it seems. The original tokio-postgres setup has a split between `Client` and `Connection`, where messages are passed between. Because multiple clients were supported, each client message included a dedicated response channel. Each request would be terminated by the `ReadyForQuery` message. The flow I opted to use for parsing types early would not trigger a `ReadyForQuery`. The flow is as follows: ``` PARSE "" // parse the user provided query DESCRIBE "" // describe the query, returning param/result type oids FLUSH // force postgres to flush the responses early // wait for descriptions // check if we know the types, if we don't then // setup the typeinfo query and execute it against each OID: PARSE typeinfo // prepare our typeinfo query DESCRIBE typeinfo FLUSH // force postgres to flush the responses early // wait for typeinfo statement // for each OID we don't know: BIND typeinfo EXECUTE FLUSH // wait for type info, might reveal more OIDs to inspect // close the typeinfo query, we cache the OID->type map and this is kinder to pgbouncer. CLOSE typeinfo // finally once we know all the OIDs: BIND "" // bind the user provided query - already parsed - to the user provided params EXECUTE // run the user provided query SYNC // commit the transaction ``` ## Summary of changes Please review commit by commit. The main challenge was allowing one query to issue multiple sub-queries. To do this I first made sure that the client could fully own the connection, which required removing any shared client state. I then had to replace the way responses are sent to the client, by using only a single permanent channel. This required some additional effort to track which query is being processed. Lastly I had to modify the query/typeinfo functions to not issue `sync` commands, so it would fit into the desired flow above. To note: the flow above does force an extra roundtrip into each query. I don't know yet if this has a measurable latency overhead.
151 lines
4.7 KiB
Rust
151 lines
4.7 KiB
Rust
use std::pin::Pin;
|
|
use std::task::{Context, Poll};
|
|
|
|
use bytes::BufMut;
|
|
use futures_util::{Stream, ready};
|
|
use postgres_protocol2::message::backend::Message;
|
|
use postgres_protocol2::message::frontend;
|
|
use postgres_types2::Format;
|
|
|
|
use crate::client::{CachedTypeInfo, InnerClient, Responses};
|
|
use crate::{Error, ReadyForQueryStatus, Row, Statement};
|
|
|
|
pub async fn query_txt<'a, S, I>(
|
|
client: &'a mut InnerClient,
|
|
typecache: &mut CachedTypeInfo,
|
|
query: &str,
|
|
params: I,
|
|
) -> Result<RowStream<'a>, Error>
|
|
where
|
|
S: AsRef<str>,
|
|
I: IntoIterator<Item = Option<S>>,
|
|
I::IntoIter: ExactSizeIterator,
|
|
{
|
|
let params = params.into_iter();
|
|
let mut client = client.start()?;
|
|
|
|
// Flow:
|
|
// 1. Parse the query
|
|
// 2. Inspect the row description for OIDs
|
|
// 3. If there's any OIDs we don't already know about, perform the typeinfo routine
|
|
// 4. Execute the query
|
|
// 5. Sync.
|
|
//
|
|
// The typeinfo routine:
|
|
// 1. Parse the typeinfo query
|
|
// 2. Execute the query on each OID
|
|
// 3. If the result does not match an OID we know, repeat 2.
|
|
|
|
// parse the query and get type info
|
|
let responses = client.send_with_flush(|buf| {
|
|
frontend::parse(
|
|
"", // unnamed prepared statement
|
|
query, // query to parse
|
|
std::iter::empty(), // give no type info
|
|
buf,
|
|
)
|
|
.map_err(Error::encode)?;
|
|
frontend::describe(b'S', "", buf).map_err(Error::encode)?;
|
|
Ok(())
|
|
})?;
|
|
|
|
match responses.next().await? {
|
|
Message::ParseComplete => {}
|
|
_ => return Err(Error::unexpected_message()),
|
|
}
|
|
|
|
match responses.next().await? {
|
|
Message::ParameterDescription(_) => {}
|
|
_ => return Err(Error::unexpected_message()),
|
|
};
|
|
|
|
let row_description = match responses.next().await? {
|
|
Message::RowDescription(body) => Some(body),
|
|
Message::NoData => None,
|
|
_ => return Err(Error::unexpected_message()),
|
|
};
|
|
|
|
let columns =
|
|
crate::prepare::parse_row_description(&mut client, typecache, row_description).await?;
|
|
|
|
let responses = client.send_with_sync(|buf| {
|
|
// Bind, pass params as text, retrieve as text
|
|
match frontend::bind(
|
|
"", // empty string selects the unnamed portal
|
|
"", // unnamed prepared statement
|
|
std::iter::empty(), // all parameters use the default format (text)
|
|
params,
|
|
|param, buf| match param {
|
|
Some(param) => {
|
|
buf.put_slice(param.as_ref().as_bytes());
|
|
Ok(postgres_protocol2::IsNull::No)
|
|
}
|
|
None => Ok(postgres_protocol2::IsNull::Yes),
|
|
},
|
|
Some(0), // all text
|
|
buf,
|
|
) {
|
|
Ok(()) => Ok(()),
|
|
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
|
|
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
|
|
}?;
|
|
|
|
// Execute
|
|
frontend::execute("", 0, buf).map_err(Error::encode)?;
|
|
|
|
Ok(())
|
|
})?;
|
|
|
|
match responses.next().await? {
|
|
Message::BindComplete => {}
|
|
_ => return Err(Error::unexpected_message()),
|
|
}
|
|
|
|
Ok(RowStream {
|
|
responses,
|
|
statement: Statement::new("", columns),
|
|
command_tag: None,
|
|
status: ReadyForQueryStatus::Unknown,
|
|
output_format: Format::Text,
|
|
})
|
|
}
|
|
|
|
/// A stream of table rows.
|
|
pub struct RowStream<'a> {
|
|
responses: &'a mut Responses,
|
|
output_format: Format,
|
|
pub statement: Statement,
|
|
pub command_tag: Option<String>,
|
|
pub status: ReadyForQueryStatus,
|
|
}
|
|
|
|
impl Stream for RowStream<'_> {
|
|
type Item = Result<Row, Error>;
|
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
let this = self.get_mut();
|
|
loop {
|
|
match ready!(this.responses.poll_next(cx)?) {
|
|
Message::DataRow(body) => {
|
|
return Poll::Ready(Some(Ok(Row::new(
|
|
this.statement.clone(),
|
|
body,
|
|
this.output_format,
|
|
)?)));
|
|
}
|
|
Message::EmptyQueryResponse | Message::PortalSuspended => {}
|
|
Message::CommandComplete(body) => {
|
|
if let Ok(tag) = body.tag() {
|
|
this.command_tag = Some(tag.to_string());
|
|
}
|
|
}
|
|
Message::ReadyForQuery(status) => {
|
|
this.status = status.into();
|
|
return Poll::Ready(None);
|
|
}
|
|
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
|
|
}
|
|
}
|
|
}
|
|
}
|