mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
recursive type queries
This commit is contained in:
@@ -30,6 +30,7 @@ use crate::pg_client;
|
||||
use crate::pg_client::codec::FrontendMessage;
|
||||
use crate::pg_client::connection;
|
||||
use crate::pg_client::connection::RequestMessages;
|
||||
use crate::pg_client::prepare::TypeinfoPreparedQueries;
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
@@ -370,60 +371,59 @@ async fn query_raw_txt_as_json<'a, St, T>(
|
||||
params: Vec<Option<String>>,
|
||||
raw_output: bool,
|
||||
array_mode: bool,
|
||||
) -> Result<Value, pg_client::error::Error>
|
||||
) -> anyhow::Result<Value>
|
||||
where
|
||||
St: AsyncRead + AsyncWrite + Unpin,
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
St: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
{
|
||||
let params = params.into_iter();
|
||||
|
||||
conn.prepare_and_execute("", "", query.as_str(), params)?;
|
||||
conn.sync().await?;
|
||||
let stmt_name = conn.statement_name();
|
||||
let row_description = conn.prepare(&stmt_name, &query).await?;
|
||||
|
||||
let mut fields = vec![];
|
||||
let mut columns = vec![];
|
||||
let mut it = row_description.fields();
|
||||
while let Some(field) = it.next().map_err(pg_client::error::Error::parse)? {
|
||||
fields.push(json!({
|
||||
"name": Value::String(field.name().to_owned()),
|
||||
"dataTypeID": Value::Number(field.type_oid().into()),
|
||||
"tableID": field.table_oid(),
|
||||
"columnID": field.column_id(),
|
||||
"dataTypeSize": field.type_size(),
|
||||
"dataTypeModifier": field.type_modifier(),
|
||||
"format": "text",
|
||||
}));
|
||||
|
||||
let type_ = match Type::from_oid(field.type_oid()) {
|
||||
Some(t) => t,
|
||||
None => TypeinfoPreparedQueries::get_type(conn, field.type_oid()).await?,
|
||||
};
|
||||
|
||||
columns.push(Column {
|
||||
name: field.name().to_string(),
|
||||
type_,
|
||||
});
|
||||
}
|
||||
|
||||
conn.execute("", &stmt_name, params)?;
|
||||
conn.sync().await?;
|
||||
|
||||
let mut rows = vec![];
|
||||
let command_tag = match conn.stream_query_results().await? {
|
||||
connection::QueryResult::NoRows(tag) => tag,
|
||||
connection::QueryResult::Rows {
|
||||
row_description,
|
||||
mut row_stream,
|
||||
} => {
|
||||
let mut columns = vec![];
|
||||
let mut it = row_description.fields();
|
||||
while let Some(field) = it.next().map_err(pg_client::error::Error::parse)? {
|
||||
fields.push(json!({
|
||||
"name": Value::String(field.name().to_owned()),
|
||||
"dataTypeID": Value::Number(field.type_oid().into()),
|
||||
"tableID": field.table_oid(),
|
||||
"columnID": field.column_id(),
|
||||
"dataTypeSize": field.type_size(),
|
||||
"dataTypeModifier": field.type_modifier(),
|
||||
"format": "text",
|
||||
}));
|
||||
|
||||
let type_ = Type::from_oid(field.type_oid());
|
||||
// let column = Column::new(field.name().to_string(), type_, field);
|
||||
columns.push(Column {
|
||||
name: field.name().to_string(),
|
||||
type_,
|
||||
});
|
||||
}
|
||||
let mut row_stream = conn.stream_query_results().await?;
|
||||
|
||||
let mut curret_size = 0;
|
||||
while let Some(row) = row_stream.next().await.transpose()? {
|
||||
curret_size += row.buffer().len();
|
||||
if curret_size > MAX_RESPONSE_SIZE {
|
||||
todo!()
|
||||
// return Err(anyhow::anyhow!("response too large"));
|
||||
}
|
||||
|
||||
rows.push(pg_text_row_to_json2(&row, &columns, raw_output, array_mode).unwrap());
|
||||
}
|
||||
|
||||
row_stream.tag()
|
||||
let mut curret_size = 0;
|
||||
while let Some(row) = row_stream.next().await.transpose()? {
|
||||
curret_size += row.buffer().len();
|
||||
if curret_size > MAX_RESPONSE_SIZE {
|
||||
return Err(anyhow::anyhow!("response too large"));
|
||||
}
|
||||
};
|
||||
|
||||
rows.push(pg_text_row_to_json2(&row, &columns, raw_output, array_mode).unwrap());
|
||||
}
|
||||
|
||||
let command_tag = row_stream.tag();
|
||||
let command_tag = command_tag.tag()?;
|
||||
let mut command_tag_split = command_tag.split(' ');
|
||||
let command_tag_name = command_tag_split.next().unwrap_or_default();
|
||||
@@ -448,7 +448,7 @@ where
|
||||
|
||||
struct Column {
|
||||
name: String,
|
||||
type_: Option<Type>,
|
||||
type_: Type,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -468,7 +468,7 @@ pub fn pg_text_row_to_json(
|
||||
None => Value::Null,
|
||||
}
|
||||
} else {
|
||||
pg_text_to_json(pg_value, Some(column.type_()))?
|
||||
pg_text_to_json(pg_value, column.type_())?
|
||||
};
|
||||
Ok((name, json_value))
|
||||
});
|
||||
@@ -514,7 +514,7 @@ fn pg_text_row_to_json2(
|
||||
None => Value::Null,
|
||||
}
|
||||
} else {
|
||||
pg_text_to_json(pg_value, column.type_.as_ref())?
|
||||
pg_text_to_json(pg_value, &column.type_)?
|
||||
};
|
||||
Ok((name, json_value))
|
||||
});
|
||||
@@ -536,22 +536,19 @@ fn pg_text_row_to_json2(
|
||||
//
|
||||
// Convert postgres text-encoded value to JSON value
|
||||
//
|
||||
pub fn pg_text_to_json(
|
||||
pg_value: Option<&str>,
|
||||
pg_type: Option<&Type>,
|
||||
) -> Result<Value, anyhow::Error> {
|
||||
pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
|
||||
if let Some(val) = pg_value {
|
||||
if let Some(Kind::Array(elem_type)) = pg_type.map(|t| t.kind()) {
|
||||
return pg_array_parse(val, Some(elem_type));
|
||||
if let Kind::Array(elem_type) = pg_type.kind() {
|
||||
return pg_array_parse(val, &elem_type);
|
||||
}
|
||||
|
||||
match pg_type {
|
||||
Some(&Type::BOOL) => Ok(Value::Bool(val == "t")),
|
||||
Some(&Type::INT2 | &Type::INT4) => {
|
||||
&Type::BOOL => Ok(Value::Bool(val == "t")),
|
||||
&Type::INT2 | &Type::INT4 => {
|
||||
let val = val.parse::<i32>()?;
|
||||
Ok(Value::Number(serde_json::Number::from(val)))
|
||||
}
|
||||
Some(&Type::FLOAT4 | &Type::FLOAT8) => {
|
||||
&Type::FLOAT4 | &Type::FLOAT8 => {
|
||||
let fval = val.parse::<f64>()?;
|
||||
let num = serde_json::Number::from_f64(fval);
|
||||
if let Some(num) = num {
|
||||
@@ -563,7 +560,7 @@ pub fn pg_text_to_json(
|
||||
Ok(Value::String(val.to_string()))
|
||||
}
|
||||
}
|
||||
Some(&Type::JSON | &Type::JSONB) => Ok(serde_json::from_str(val)?),
|
||||
&Type::JSON | &Type::JSONB => Ok(serde_json::from_str(val)?),
|
||||
_ => Ok(Value::String(val.to_string())),
|
||||
}
|
||||
} else {
|
||||
@@ -578,13 +575,13 @@ pub fn pg_text_to_json(
|
||||
// values. Unlike postgres we don't check that all nested arrays have the same
|
||||
// dimensions, we just return them as is.
|
||||
//
|
||||
fn pg_array_parse(pg_array: &str, elem_type: Option<&Type>) -> Result<Value, anyhow::Error> {
|
||||
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
|
||||
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
|
||||
}
|
||||
|
||||
fn _pg_array_parse(
|
||||
pg_array: &str,
|
||||
elem_type: Option<&Type>,
|
||||
elem_type: &Type,
|
||||
nested: bool,
|
||||
) -> Result<(Value, usize), anyhow::Error> {
|
||||
let mut pg_array_chr = pg_array.char_indices();
|
||||
@@ -605,7 +602,7 @@ fn _pg_array_parse(
|
||||
fn push_checked(
|
||||
entry: &mut String,
|
||||
entries: &mut Vec<Value>,
|
||||
elem_type: Option<&Type>,
|
||||
elem_type: &Type,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
if !entry.is_empty() {
|
||||
// While in usual postgres response we get nulls as None and everything else
|
||||
@@ -731,43 +728,34 @@ mod tests {
|
||||
#[test]
|
||||
fn test_atomic_types_parse() {
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("foo"), Some(&Type::TEXT)).unwrap(),
|
||||
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
|
||||
json!("foo")
|
||||
);
|
||||
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
|
||||
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
|
||||
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
|
||||
assert_eq!(
|
||||
pg_text_to_json(None, Some(&Type::TEXT)).unwrap(),
|
||||
json!(null)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42"), Some(&Type::INT4)).unwrap(),
|
||||
json!(42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42"), Some(&Type::INT2)).unwrap(),
|
||||
json!(42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42"), Some(&Type::INT8)).unwrap(),
|
||||
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
|
||||
json!("42")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42.42"), Some(&Type::FLOAT8)).unwrap(),
|
||||
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
|
||||
json!(42.42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42.42"), Some(&Type::FLOAT4)).unwrap(),
|
||||
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
|
||||
json!(42.42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("NaN"), Some(&Type::FLOAT4)).unwrap(),
|
||||
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
|
||||
json!("NaN")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("Infinity"), Some(&Type::FLOAT4)).unwrap(),
|
||||
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
|
||||
json!("Infinity")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("-Infinity"), Some(&Type::FLOAT4)).unwrap(),
|
||||
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
|
||||
json!("-Infinity")
|
||||
);
|
||||
|
||||
@@ -777,7 +765,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
pg_text_to_json(
|
||||
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
|
||||
Some(&Type::JSONB)
|
||||
&Type::JSONB
|
||||
)
|
||||
.unwrap(),
|
||||
json
|
||||
@@ -787,7 +775,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_pg_array_parse_text() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, Some(&Type::TEXT)).unwrap()
|
||||
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
|
||||
}
|
||||
assert_eq!(
|
||||
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
|
||||
@@ -810,7 +798,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_pg_array_parse_bool() {
|
||||
fn pb(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, Some(&Type::BOOL)).unwrap()
|
||||
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
|
||||
}
|
||||
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
|
||||
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
|
||||
@@ -827,7 +815,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_pg_array_parse_numbers() {
|
||||
fn pn(pg_arr: &str, ty: &Type) -> Value {
|
||||
pg_array_parse(pg_arr, Some(ty)).unwrap()
|
||||
pg_array_parse(pg_arr, ty).unwrap()
|
||||
}
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
|
||||
@@ -855,7 +843,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_pg_array_with_decoration() {
|
||||
fn p(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, Some(&Type::INT2)).unwrap()
|
||||
pg_array_parse(pg_arr, &Type::INT2).unwrap()
|
||||
}
|
||||
assert_eq!(
|
||||
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
|
||||
|
||||
@@ -1,23 +1,23 @@
|
||||
use super::codec::{BackendMessages, FrontendMessage, PostgresCodec};
|
||||
use super::error::Error;
|
||||
use super::prepare::TypeinfoPreparedQueries;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use fallible_iterator::FallibleIterator;
|
||||
use futures::channel::mpsc;
|
||||
use futures::{Sink, StreamExt};
|
||||
use futures::{SinkExt, Stream};
|
||||
use postgres_protocol::authentication;
|
||||
use hashbrown::HashMap;
|
||||
use postgres_protocol::Oid;
|
||||
use postgres_protocol::message::backend::{
|
||||
BackendKeyDataBody, CommandCompleteBody, DataRowBody, Message, ReadyForQueryBody,
|
||||
RowDescriptionBody,
|
||||
};
|
||||
use postgres_protocol::message::frontend;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use tokio_postgres::types::Type;
|
||||
use std::collections::VecDeque;
|
||||
use std::future::poll_fn;
|
||||
use std::pin::Pin;
|
||||
use std::task::{ready, Context, Poll};
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_native_tls::{native_tls, TlsConnector, TlsStream};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_postgres::maybe_tls_stream::MaybeTlsStream;
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
@@ -41,105 +41,7 @@ pub struct RawConnection<S, T> {
|
||||
pub buf: BytesMut,
|
||||
}
|
||||
|
||||
// enum MaybeTlsStream {
|
||||
// NoTls(TcpStream),
|
||||
// Tls(TlsStream<TcpStream>),
|
||||
// }
|
||||
|
||||
// impl Unpin for MaybeTlsStream {}
|
||||
|
||||
// impl AsyncRead for MaybeTlsStream {
|
||||
// fn poll_read(
|
||||
// self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// buf: &mut tokio::io::ReadBuf<'_>,
|
||||
// ) -> Poll<std::io::Result<()>> {
|
||||
// match self.get_mut() {
|
||||
// MaybeTlsStream::NoTls(no_tls) => Pin::new(no_tls).poll_read(cx, buf),
|
||||
// MaybeTlsStream::Tls(tls) => Pin::new(tls).poll_read(cx, buf),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// impl AsyncWrite for MaybeTlsStream {
|
||||
// fn poll_write(
|
||||
// self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// buf: &[u8],
|
||||
// ) -> Poll<Result<usize, std::io::Error>> {
|
||||
// match self.get_mut() {
|
||||
// MaybeTlsStream::NoTls(no_tls) => Pin::new(no_tls).poll_write(cx, buf),
|
||||
// MaybeTlsStream::Tls(tls) => Pin::new(tls).poll_write(cx, buf),
|
||||
// }
|
||||
// }
|
||||
|
||||
// fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
|
||||
// match self.get_mut() {
|
||||
// MaybeTlsStream::NoTls(no_tls) => Pin::new(no_tls).poll_flush(cx),
|
||||
// MaybeTlsStream::Tls(tls) => Pin::new(tls).poll_flush(cx),
|
||||
// }
|
||||
// }
|
||||
|
||||
// fn poll_shutdown(
|
||||
// self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// ) -> Poll<Result<(), std::io::Error>> {
|
||||
// match self.get_mut() {
|
||||
// MaybeTlsStream::NoTls(no_tls) => Pin::new(no_tls).poll_shutdown(cx),
|
||||
// MaybeTlsStream::Tls(tls) => Pin::new(tls).poll_shutdown(cx),
|
||||
// }
|
||||
// }
|
||||
|
||||
// fn poll_write_vectored(
|
||||
// self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// bufs: &[std::io::IoSlice<'_>],
|
||||
// ) -> Poll<Result<usize, std::io::Error>> {
|
||||
// match self.get_mut() {
|
||||
// MaybeTlsStream::NoTls(no_tls) => Pin::new(no_tls).poll_write_vectored(cx, bufs),
|
||||
// MaybeTlsStream::Tls(tls) => Pin::new(tls).poll_write_vectored(cx, bufs),
|
||||
// }
|
||||
// }
|
||||
|
||||
// fn is_write_vectored(&self) -> bool {
|
||||
// match self {
|
||||
// MaybeTlsStream::NoTls(no_tls) => no_tls.is_write_vectored(),
|
||||
// MaybeTlsStream::Tls(tls) => tls.is_write_vectored(),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin> RawConnection<S, T> {
|
||||
// pub(crate) async fn connect(
|
||||
// mut stream: TcpStream,
|
||||
// tls_domain: Option<&str>,
|
||||
// ) -> Result<RawConnection<S, T>, Error> {
|
||||
// let mut buf = BytesMut::new();
|
||||
|
||||
// let stream = if let Some(tls_domain) = tls_domain {
|
||||
// frontend::ssl_request(&mut buf);
|
||||
// stream
|
||||
// .write_all_buf(&mut buf.split().freeze())
|
||||
// .await
|
||||
// .unwrap();
|
||||
// let bit = stream.read_u8().await.map_err(Error::io)?;
|
||||
// if bit != b'S' {
|
||||
// return Err(Error::closed());
|
||||
// }
|
||||
|
||||
// let tls = native_tls::TlsConnector::new().map_err(Error::tls)?;
|
||||
// let tls = TlsConnector::from(tls)
|
||||
// .connect(tls_domain, stream)
|
||||
// .await
|
||||
// .map_err(Error::tls)?;
|
||||
|
||||
// MaybeTlsStream::Tls(tls)
|
||||
// } else {
|
||||
// MaybeTlsStream::Raw(stream)
|
||||
// };
|
||||
|
||||
// Ok(RawConnection::new(Framed::new(stream, PostgresCodec), buf))
|
||||
// }
|
||||
|
||||
pub fn new(
|
||||
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
|
||||
buf: BytesMut,
|
||||
@@ -193,8 +95,11 @@ impl<S: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin> RawCo
|
||||
}
|
||||
|
||||
pub struct Connection<S, T> {
|
||||
raw: RawConnection<S, T>,
|
||||
key: BackendKeyDataBody,
|
||||
stmt_counter: usize,
|
||||
pub typeinfo: Option<TypeinfoPreparedQueries>,
|
||||
pub typecache: HashMap<Oid, Type>,
|
||||
pub raw: RawConnection<S, T>,
|
||||
// key: BackendKeyDataBody,
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin> Connection<S, T> {
|
||||
@@ -263,19 +168,27 @@ impl<S: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin> Conne
|
||||
// Ok(Self { raw, key })
|
||||
// }
|
||||
|
||||
pub fn prepare_and_execute(
|
||||
&mut self,
|
||||
portal: &str,
|
||||
name: &str,
|
||||
query: &str,
|
||||
params: impl IntoIterator<Item = Option<impl AsRef<str>>>,
|
||||
) -> std::io::Result<()> {
|
||||
self.prepare(name, query)?;
|
||||
self.execute(portal, name, params)
|
||||
// pub fn prepare_and_execute(
|
||||
// &mut self,
|
||||
// portal: &str,
|
||||
// name: &str,
|
||||
// query: &str,
|
||||
// params: impl IntoIterator<Item = Option<impl AsRef<str>>>,
|
||||
// ) -> std::io::Result<()> {
|
||||
// self.prepare(name, query)?;
|
||||
// self.execute(portal, name, params)
|
||||
// }
|
||||
|
||||
pub fn statement_name(&mut self) -> String {
|
||||
self.stmt_counter += 1;
|
||||
format!("s{}", self.stmt_counter)
|
||||
}
|
||||
|
||||
pub fn prepare(&mut self, name: &str, query: &str) -> std::io::Result<()> {
|
||||
frontend::parse(name, query, std::iter::empty(), &mut self.raw.buf)
|
||||
pub async fn prepare(&mut self, name: &str, query: &str) -> Result<RowDescriptionBody, Error> {
|
||||
frontend::parse(name, query, std::iter::empty(), &mut self.raw.buf)?;
|
||||
frontend::describe(b'S', name, &mut self.raw.buf)?;
|
||||
self.sync().await?;
|
||||
self.wait_for_prepare().await
|
||||
}
|
||||
|
||||
pub fn execute(
|
||||
@@ -303,7 +216,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin> Conne
|
||||
frontend::BindError::Conversion(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
|
||||
frontend::BindError::Serialization(io) => io,
|
||||
})?;
|
||||
frontend::describe(b'P', portal, &mut self.raw.buf)?;
|
||||
frontend::execute(portal, 0, &mut self.raw.buf)
|
||||
}
|
||||
|
||||
@@ -312,35 +224,20 @@ impl<S: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin> Conne
|
||||
self.raw.send().await
|
||||
}
|
||||
|
||||
pub async fn wait_for_prepare(&mut self) -> Result<Option<RowDescriptionBody>, Error> {
|
||||
pub async fn wait_for_prepare(&mut self) -> Result<RowDescriptionBody, Error> {
|
||||
let Message::ParseComplete = self.raw.next_message().await? else { return Err(Error::expecting("parse")) };
|
||||
let Message::BindComplete = self.raw.next_message().await? else { return Err(Error::expecting("bind")) };
|
||||
match self.raw.next_message().await? {
|
||||
Message::RowDescription(desc) => Ok(QueryResult::Rows {
|
||||
row_stream: RowStream::Stream(&mut self.raw),
|
||||
row_description: desc,
|
||||
}),
|
||||
Message::NoData => {
|
||||
let Message::CommandComplete(tag) = self.raw.next_message().await? else { return Err(Error::expecting("command completion")) };
|
||||
Ok(QueryResult::NoRows(tag))
|
||||
}
|
||||
_ => Err(Error::expecting("query results")),
|
||||
}
|
||||
let Message::ParameterDescription(_) = self.raw.next_message().await? else { return Err(Error::expecting("param description")) };
|
||||
let Message::RowDescription(desc) = self.raw.next_message().await? else { return Err(Error::expecting("row description")) };
|
||||
|
||||
self.wait_for_ready().await?;
|
||||
|
||||
Ok(desc)
|
||||
}
|
||||
|
||||
pub async fn stream_query_results(&mut self) -> Result<RowStream<'_, S, T>, Error> {
|
||||
let Message::ParseComplete = self.raw.next_message().await? else { return Err(Error::expecting("parse")) };
|
||||
// let Message::ParseComplete = self.raw.next_message().await? else { return Err(Error::expecting("parse")) };
|
||||
let Message::BindComplete = self.raw.next_message().await? else { return Err(Error::expecting("bind")) };
|
||||
match self.raw.next_message().await? {
|
||||
Message::RowDescription(desc) => Ok(QueryResult::Rows {
|
||||
row_stream: RowStream::Stream(&mut self.raw),
|
||||
row_description: desc,
|
||||
}),
|
||||
Message::NoData => {
|
||||
let Message::CommandComplete(tag) = self.raw.next_message().await? else { return Err(Error::expecting("command completion")) };
|
||||
Ok(QueryResult::NoRows(tag))
|
||||
}
|
||||
_ => Err(Error::expecting("query results")),
|
||||
}
|
||||
Ok(RowStream::Stream(&mut self.raw))
|
||||
}
|
||||
|
||||
pub async fn wait_for_ready(&mut self) -> Result<ReadyForQueryBody, Error> {
|
||||
@@ -353,14 +250,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin> Conne
|
||||
}
|
||||
}
|
||||
|
||||
// pub enum QueryResult<'a, S, T> {
|
||||
// NoRows(CommandCompleteBody),
|
||||
// Rows {
|
||||
// row_description: RowDescriptionBody,
|
||||
// row_stream: RowStream<'a, S, T>,
|
||||
// },
|
||||
// }
|
||||
|
||||
pub enum RowStream<'a, S, T> {
|
||||
Stream(&'a mut RawConnection<S, T>),
|
||||
Complete(CommandCompleteBody),
|
||||
|
||||
@@ -2,5 +2,4 @@
|
||||
pub mod codec;
|
||||
pub mod connection;
|
||||
pub mod error;
|
||||
// mod prepare;
|
||||
// mod pg_type;
|
||||
pub mod prepare;
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,21 +1,14 @@
|
||||
// use crate::client::InnerClient;
|
||||
// use crate::codec::FrontendMessage;
|
||||
// use crate::connection::RequestMessages;
|
||||
// use crate::error::SqlState;
|
||||
// use crate::types::{Field, Kind, Oid, Type};
|
||||
// use crate::{query, slice_iter};
|
||||
// use crate::{Column, Error, Statement};
|
||||
use bytes::Bytes;
|
||||
use fallible_iterator::FallibleIterator;
|
||||
// use futures_util::{pin_mut, TryStreamExt};
|
||||
// use log::debug;
|
||||
use postgres_protocol::message::backend::Message;
|
||||
use futures::StreamExt;
|
||||
use postgres_protocol::message::backend::{DataRowRanges, Message};
|
||||
use postgres_protocol::message::frontend;
|
||||
use tokio_postgres::types::Type;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_postgres::types::{Field, Kind, Oid, ToSql, Type};
|
||||
|
||||
use super::connection::Connection;
|
||||
use super::error::Error;
|
||||
|
||||
const TYPEINFO_QUERY: &str = "\
|
||||
SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid
|
||||
@@ -25,14 +18,6 @@ INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
|
||||
WHERE t.oid = $1
|
||||
";
|
||||
|
||||
// Range types weren't added until Postgres 9.2, so pg_range may not exist
|
||||
const TYPEINFO_FALLBACK_QUERY: &str = "\
|
||||
SELECT t.typname, t.typtype, t.typelem, NULL::OID, t.typbasetype, n.nspname, t.typrelid
|
||||
FROM pg_catalog.pg_type t
|
||||
INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
|
||||
WHERE t.oid = $1
|
||||
";
|
||||
|
||||
const TYPEINFO_ENUM_QUERY: &str = "\
|
||||
SELECT enumlabel
|
||||
FROM pg_catalog.pg_enum
|
||||
@@ -40,14 +25,6 @@ WHERE enumtypid = $1
|
||||
ORDER BY enumsortorder
|
||||
";
|
||||
|
||||
// Postgres 9.0 didn't have enumsortorder
|
||||
const TYPEINFO_ENUM_FALLBACK_QUERY: &str = "\
|
||||
SELECT enumlabel
|
||||
FROM pg_catalog.pg_enum
|
||||
WHERE enumtypid = $1
|
||||
ORDER BY oid
|
||||
";
|
||||
|
||||
const TYPEINFO_COMPOSITE_QUERY: &str = "\
|
||||
SELECT attname, atttypid
|
||||
FROM pg_catalog.pg_attribute
|
||||
@@ -57,207 +34,255 @@ AND attnum > 0
|
||||
ORDER BY attnum
|
||||
";
|
||||
|
||||
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
// pub async fn prepare(
|
||||
// client: &Arc<InnerClient>,
|
||||
// query: &str,
|
||||
// types: &[Type],
|
||||
// ) -> Result<Statement, Error> {
|
||||
// let name = format!("s{}", NEXT_ID.fetch_add(1, Ordering::SeqCst));
|
||||
// let buf = encode(client, &name, query, types)?;
|
||||
// let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
|
||||
|
||||
// match responses.next().await? {
|
||||
// Message::ParseComplete => {}
|
||||
// _ => return Err(Error::unexpected_message()),
|
||||
// }
|
||||
|
||||
// let parameter_description = match responses.next().await? {
|
||||
// Message::ParameterDescription(body) => body,
|
||||
// _ => return Err(Error::unexpected_message()),
|
||||
// };
|
||||
|
||||
// let row_description = match responses.next().await? {
|
||||
// Message::RowDescription(body) => Some(body),
|
||||
// Message::NoData => None,
|
||||
// _ => return Err(Error::unexpected_message()),
|
||||
// };
|
||||
|
||||
// let mut parameters = vec![];
|
||||
// let mut it = parameter_description.parameters();
|
||||
// while let Some(oid) = it.next().map_err(Error::parse)? {
|
||||
// let type_ = get_type(client, oid).await?;
|
||||
// parameters.push(type_);
|
||||
// }
|
||||
|
||||
// let mut columns = vec![];
|
||||
// if let Some(row_description) = row_description {
|
||||
// let mut it = row_description.fields();
|
||||
// while let Some(field) = it.next().map_err(Error::parse)? {
|
||||
// let type_ = get_type(client, field.type_oid()).await?;
|
||||
// let column = Column::new(field.name().to_string(), type_, field);
|
||||
// columns.push(column);
|
||||
// }
|
||||
// }
|
||||
|
||||
// Ok(Statement::new(client, name, parameters, columns))
|
||||
// }
|
||||
|
||||
// fn prepare_rec<'a>(
|
||||
// client: &'a Arc<InnerClient>,
|
||||
// query: &'a str,
|
||||
// types: &'a [Type],
|
||||
// ) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'a + Send>> {
|
||||
// Box::pin(prepare(client, query, types))
|
||||
// }
|
||||
|
||||
// fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
|
||||
// if types.is_empty() {
|
||||
// debug!("preparing query {}: {}", name, query);
|
||||
// } else {
|
||||
// debug!("preparing query {} with types {:?}: {}", name, types, query);
|
||||
// }
|
||||
|
||||
// client.with_buf(|buf| {
|
||||
// frontend::parse(name, query, types.iter().map(Type::oid), buf).map_err(Error::encode)?;
|
||||
// frontend::describe(b'S', name, buf).map_err(Error::encode)?;
|
||||
// frontend::sync(buf);
|
||||
// Ok(buf.split().freeze())
|
||||
// })
|
||||
// }
|
||||
|
||||
pub async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type, Error> {
|
||||
if let Some(type_) = Type::from_oid(oid) {
|
||||
return Ok(type_);
|
||||
}
|
||||
|
||||
if let Some(type_) = client.type_(oid) {
|
||||
return Ok(type_);
|
||||
}
|
||||
|
||||
let stmt = typeinfo_statement(client).await?;
|
||||
|
||||
let rows = query::query(client, stmt, slice_iter(&[&oid])).await?;
|
||||
pin_mut!(rows);
|
||||
|
||||
let row = match rows.try_next().await? {
|
||||
Some(row) => row,
|
||||
None => return Err(Error::unexpected_message()),
|
||||
};
|
||||
|
||||
let name: String = row.try_get(0)?;
|
||||
let type_: i8 = row.try_get(1)?;
|
||||
let elem_oid: Oid = row.try_get(2)?;
|
||||
let rngsubtype: Option<Oid> = row.try_get(3)?;
|
||||
let basetype: Oid = row.try_get(4)?;
|
||||
let schema: String = row.try_get(5)?;
|
||||
let relid: Oid = row.try_get(6)?;
|
||||
|
||||
let kind = if type_ == b'e' as i8 {
|
||||
let variants = get_enum_variants(client, oid).await?;
|
||||
Kind::Enum(variants)
|
||||
} else if type_ == b'p' as i8 {
|
||||
Kind::Pseudo
|
||||
} else if basetype != 0 {
|
||||
let type_ = get_type_rec(client, basetype).await?;
|
||||
Kind::Domain(type_)
|
||||
} else if elem_oid != 0 {
|
||||
let type_ = get_type_rec(client, elem_oid).await?;
|
||||
Kind::Array(type_)
|
||||
} else if relid != 0 {
|
||||
let fields = get_composite_fields(client, relid).await?;
|
||||
Kind::Composite(fields)
|
||||
} else if let Some(rngsubtype) = rngsubtype {
|
||||
let type_ = get_type_rec(client, rngsubtype).await?;
|
||||
Kind::Range(type_)
|
||||
} else {
|
||||
Kind::Simple
|
||||
};
|
||||
|
||||
let type_ = Type::new(name, oid, kind, schema);
|
||||
client.set_type(oid, &type_);
|
||||
|
||||
Ok(type_)
|
||||
#[derive(Clone)]
|
||||
pub struct TypeinfoPreparedQueries {
|
||||
query: String,
|
||||
enum_query: String,
|
||||
composite_query: String,
|
||||
}
|
||||
|
||||
fn get_type_rec<'a>(
|
||||
client: &'a Arc<InnerClient>,
|
||||
oid: Oid,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Type, Error>> + Send + 'a>> {
|
||||
Box::pin(get_type(client, oid))
|
||||
fn map_is_null(x: tokio_postgres::types::IsNull) -> postgres_protocol::IsNull {
|
||||
match x {
|
||||
tokio_postgres::types::IsNull::Yes => postgres_protocol::IsNull::Yes,
|
||||
tokio_postgres::types::IsNull::No => postgres_protocol::IsNull::No,
|
||||
}
|
||||
}
|
||||
|
||||
async fn typeinfo_statement(client: &Arc<InnerClient>) -> Result<Statement, Error> {
|
||||
if let Some(stmt) = client.typeinfo() {
|
||||
return Ok(stmt);
|
||||
fn read_column<'a, T: tokio_postgres::types::FromSql<'a>>(
|
||||
buffer: &'a [u8],
|
||||
type_: &Type,
|
||||
ranges: &mut DataRowRanges<'a>,
|
||||
) -> Result<T, Error> {
|
||||
let range = ranges.next()?;
|
||||
match range {
|
||||
Some(range) => T::from_sql_nullable(type_, range.map(|r| &buffer[r])),
|
||||
None => T::from_sql_null(type_),
|
||||
}
|
||||
.map_err(|e| Error::from_sql(e, 0))
|
||||
}
|
||||
|
||||
let stmt = match prepare_rec(client, TYPEINFO_QUERY, &[]).await {
|
||||
Ok(stmt) => stmt,
|
||||
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_TABLE) => {
|
||||
prepare_rec(client, TYPEINFO_FALLBACK_QUERY, &[]).await?
|
||||
impl TypeinfoPreparedQueries {
|
||||
pub async fn new<
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
>(
|
||||
c: &mut Connection<S, T>,
|
||||
) -> Result<Self, Error> {
|
||||
if let Some(ti) = &c.typeinfo {
|
||||
return Ok(ti.clone());
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
client.set_typeinfo(&stmt);
|
||||
Ok(stmt)
|
||||
}
|
||||
let query = c.statement_name();
|
||||
let enum_query = c.statement_name();
|
||||
let composite_query = c.statement_name();
|
||||
|
||||
async fn get_enum_variants(client: &Arc<InnerClient>, oid: Oid) -> Result<Vec<String>, Error> {
|
||||
let stmt = typeinfo_enum_statement(client).await?;
|
||||
frontend::parse(&query, TYPEINFO_QUERY, [Type::OID.oid()], &mut c.raw.buf)?;
|
||||
frontend::parse(
|
||||
&enum_query,
|
||||
TYPEINFO_ENUM_QUERY,
|
||||
[Type::OID.oid()],
|
||||
&mut c.raw.buf,
|
||||
)?;
|
||||
c.sync().await?;
|
||||
frontend::parse(
|
||||
&composite_query,
|
||||
TYPEINFO_COMPOSITE_QUERY,
|
||||
[Type::OID.oid()],
|
||||
&mut c.raw.buf,
|
||||
)?;
|
||||
c.sync().await?;
|
||||
|
||||
query::query(client, stmt, slice_iter(&[&oid]))
|
||||
.await?
|
||||
.and_then(|row| async move { row.try_get(0) })
|
||||
.try_collect()
|
||||
.await
|
||||
}
|
||||
let Message::ParseComplete = c.raw.next_message().await? else { return Err(Error::expecting("parse")) };
|
||||
let Message::ParseComplete = c.raw.next_message().await? else { return Err(Error::expecting("parse")) };
|
||||
let Message::ParseComplete = c.raw.next_message().await? else { return Err(Error::expecting("parse")) };
|
||||
c.wait_for_ready().await?;
|
||||
|
||||
async fn typeinfo_enum_statement(client: &Arc<InnerClient>) -> Result<Statement, Error> {
|
||||
if let Some(stmt) = client.typeinfo_enum() {
|
||||
return Ok(stmt);
|
||||
Ok(c.typeinfo
|
||||
.insert(TypeinfoPreparedQueries {
|
||||
query,
|
||||
enum_query,
|
||||
composite_query,
|
||||
})
|
||||
.clone())
|
||||
}
|
||||
|
||||
let stmt = match prepare_rec(client, TYPEINFO_ENUM_QUERY, &[]).await {
|
||||
Ok(stmt) => stmt,
|
||||
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_COLUMN) => {
|
||||
prepare_rec(client, TYPEINFO_ENUM_FALLBACK_QUERY, &[]).await?
|
||||
fn get_type_rec<
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
>(
|
||||
c: &mut Connection<S, T>,
|
||||
oid: Oid,
|
||||
) -> Pin<Box<dyn Future<Output = Result<Type, Error>> + Send + '_>> {
|
||||
Box::pin(Self::get_type(c, oid))
|
||||
}
|
||||
|
||||
pub async fn get_type<
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
>(
|
||||
c: &mut Connection<S, T>,
|
||||
oid: Oid,
|
||||
) -> Result<Type, Error> {
|
||||
if let Some(type_) = Type::from_oid(oid) {
|
||||
return Ok(type_);
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
client.set_typeinfo_enum(&stmt);
|
||||
Ok(stmt)
|
||||
}
|
||||
if let Some(type_) = c.typecache.get(&oid) {
|
||||
return Ok(type_.clone());
|
||||
}
|
||||
|
||||
async fn get_composite_fields(client: &Arc<InnerClient>, oid: Oid) -> Result<Vec<Field>, Error> {
|
||||
let stmt = typeinfo_composite_statement(client).await?;
|
||||
let queries = Self::new(c).await?;
|
||||
|
||||
let rows = query::query(client, stmt, slice_iter(&[&oid]))
|
||||
.await?
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
frontend::bind(
|
||||
"",
|
||||
&queries.query,
|
||||
[1], // the only parameter is in binary format
|
||||
[oid],
|
||||
|param, buf| param.to_sql(&Type::OID, buf).map(map_is_null),
|
||||
Some(1), // binary return type
|
||||
&mut c.raw.buf,
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
frontend::BindError::Conversion(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
|
||||
frontend::BindError::Serialization(io) => io,
|
||||
})?;
|
||||
frontend::execute("", 0, &mut c.raw.buf)?;
|
||||
|
||||
let mut fields = vec![];
|
||||
for row in rows {
|
||||
let name = row.try_get(0)?;
|
||||
let oid = row.try_get(1)?;
|
||||
let type_ = get_type_rec(client, oid).await?;
|
||||
fields.push(Field::new(name, type_));
|
||||
c.sync().await?;
|
||||
|
||||
let mut stream = c.stream_query_results().await?;
|
||||
|
||||
let Some(row) = stream.next().await.transpose()? else {
|
||||
todo!()
|
||||
};
|
||||
|
||||
let b = row.buffer();
|
||||
let mut ranges = row.ranges();
|
||||
|
||||
let name: String = read_column(b, &Type::NAME, &mut ranges)?;
|
||||
let type_: i8 = read_column(b, &Type::CHAR, &mut ranges)?;
|
||||
let elem_oid: Oid = read_column(b, &Type::OID, &mut ranges)?;
|
||||
let rngsubtype: Option<Oid> = read_column(b, &Type::OID, &mut ranges)?;
|
||||
let basetype: Oid = read_column(b, &Type::OID, &mut ranges)?;
|
||||
let schema: String = read_column(b, &Type::NAME, &mut ranges)?;
|
||||
let relid: Oid = read_column(b, &Type::OID, &mut ranges)?;
|
||||
|
||||
{
|
||||
// should be none
|
||||
let None = stream.next().await.transpose()? else {
|
||||
todo!()
|
||||
};
|
||||
drop(stream);
|
||||
}
|
||||
|
||||
let kind = if type_ == b'e' as i8 {
|
||||
let variants = Self::get_enum_variants(c, oid).await?;
|
||||
Kind::Enum(variants)
|
||||
} else if type_ == b'p' as i8 {
|
||||
Kind::Pseudo
|
||||
} else if basetype != 0 {
|
||||
let type_ = Self::get_type_rec(c, basetype).await?;
|
||||
Kind::Domain(type_)
|
||||
} else if elem_oid != 0 {
|
||||
let type_ = Self::get_type_rec(c, elem_oid).await?;
|
||||
Kind::Array(type_)
|
||||
} else if relid != 0 {
|
||||
let fields = Self::get_composite_fields(c, relid).await?;
|
||||
Kind::Composite(fields)
|
||||
} else if let Some(rngsubtype) = rngsubtype {
|
||||
let type_ = Self::get_type_rec(c, rngsubtype).await?;
|
||||
Kind::Range(type_)
|
||||
} else {
|
||||
Kind::Simple
|
||||
};
|
||||
|
||||
let type_ = Type::new(name, oid, kind, schema);
|
||||
c.typecache.insert(oid, type_.clone());
|
||||
|
||||
Ok(type_)
|
||||
}
|
||||
|
||||
Ok(fields)
|
||||
}
|
||||
async fn get_enum_variants<
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
>(
|
||||
c: &mut Connection<S, T>,
|
||||
oid: Oid,
|
||||
) -> Result<Vec<String>, Error> {
|
||||
let queries = Self::new(c).await?;
|
||||
|
||||
async fn typeinfo_composite_statement(client: &Arc<InnerClient>) -> Result<Statement, Error> {
|
||||
if let Some(stmt) = client.typeinfo_composite() {
|
||||
return Ok(stmt);
|
||||
frontend::bind(
|
||||
"",
|
||||
&queries.enum_query,
|
||||
[1], // the only parameter is in binary format
|
||||
[oid],
|
||||
|param, buf| param.to_sql(&Type::OID, buf).map(map_is_null),
|
||||
Some(1), // binary return type
|
||||
&mut c.raw.buf,
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
frontend::BindError::Conversion(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
|
||||
frontend::BindError::Serialization(io) => io,
|
||||
})?;
|
||||
frontend::execute("", 0, &mut c.raw.buf)?;
|
||||
|
||||
c.sync().await?;
|
||||
|
||||
let mut stream = c.stream_query_results().await?;
|
||||
let mut variants = Vec::new();
|
||||
while let Some(row) = stream.next().await.transpose()? {
|
||||
let variant: String = read_column(row.buffer(), &Type::NAME, &mut row.ranges())?;
|
||||
variants.push(variant);
|
||||
}
|
||||
|
||||
c.wait_for_ready().await?;
|
||||
|
||||
Ok(variants)
|
||||
}
|
||||
|
||||
let stmt = prepare_rec(client, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
|
||||
async fn get_composite_fields<
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send,
|
||||
>(
|
||||
c: &mut Connection<S, T>,
|
||||
oid: Oid,
|
||||
) -> Result<Vec<Field>, Error> {
|
||||
let queries = Self::new(c).await?;
|
||||
|
||||
client.set_typeinfo_composite(&stmt);
|
||||
Ok(stmt)
|
||||
frontend::bind(
|
||||
"",
|
||||
&queries.composite_query,
|
||||
[1], // the only parameter is in binary format
|
||||
[oid],
|
||||
|param, buf| param.to_sql(&Type::OID, buf).map(map_is_null),
|
||||
Some(1), // binary return type
|
||||
&mut c.raw.buf,
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
frontend::BindError::Conversion(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
|
||||
frontend::BindError::Serialization(io) => io,
|
||||
})?;
|
||||
frontend::execute("", 0, &mut c.raw.buf)?;
|
||||
|
||||
c.sync().await?;
|
||||
|
||||
let mut stream = c.stream_query_results().await?;
|
||||
let mut fields = Vec::new();
|
||||
while let Some(row) = stream.next().await.transpose()? {
|
||||
let mut ranges = row.ranges();
|
||||
let name: String = read_column(row.buffer(), &Type::NAME, &mut ranges)?;
|
||||
let oid: Oid = read_column(row.buffer(), &Type::OID, &mut ranges)?;
|
||||
fields.push((name, oid));
|
||||
}
|
||||
|
||||
c.wait_for_ready().await?;
|
||||
|
||||
let mut output_fields = Vec::with_capacity(fields.len());
|
||||
for (name, oid) in fields {
|
||||
let type_ = Self::get_type_rec(c, oid).await?;
|
||||
output_fields.push(Field::new(name, type_))
|
||||
}
|
||||
|
||||
Ok(output_fields)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user