Compare commits

...

3 Commits

Author SHA1 Message Date
Conrad Ludgate
edea436191 only one client, so only one channel pair 2025-05-21 22:54:56 +01:00
Conrad Ludgate
4b0c7f9530 remove arc around inner client 2025-05-21 21:58:03 +01:00
Conrad Ludgate
ab61864df1 proxy(tokio-postgres): move statement cleanup to client drop 2025-05-21 21:53:57 +01:00
11 changed files with 186 additions and 250 deletions

View File

@@ -1,14 +1,12 @@
use std::collections::HashMap;
use std::fmt;
use std::net::IpAddr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, future, ready};
use parking_lot::Mutex;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use serde::{Deserialize, Serialize};
@@ -16,7 +14,6 @@ use tokio::sync::mpsc;
use crate::codec::{BackendMessages, FrontendMessage};
use crate::config::{Host, SslMode};
use crate::connection::{Request, RequestMessages};
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
use crate::types::{Oid, Type};
@@ -26,19 +23,43 @@ use crate::{
};
pub struct Responses {
/// new messages from conn
receiver: mpsc::Receiver<BackendMessages>,
/// current batch of messages
cur: BackendMessages,
/// number of total queries sent.
waiting: usize,
/// number of ReadyForQuery messages received.
received: usize,
}
impl Responses {
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
loop {
match self.cur.next().map_err(Error::parse)? {
Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
Some(message) => return Poll::Ready(Ok(message)),
None => {}
// get the next saved message
if let Some(message) = self.cur.next().map_err(Error::parse)? {
let received = self.received;
// increase the query head if this is the last message.
if let Message::ReadyForQuery(_) = message {
self.received += 1;
}
// check if the client has skipped this query.
if received + 1 < self.waiting {
// grab the next message.
continue;
}
// convenience: turn the error messaage into a proper error.
let res = match message {
Message::ErrorResponse(body) => Err(Error::db(body)),
message => Ok(message),
};
return Poll::Ready(res);
}
// get the next back of messages.
match ready!(self.receiver.poll_recv(cx)) {
Some(messages) => self.cur = messages,
None => return Poll::Ready(Err(Error::closed())),
@@ -65,33 +86,28 @@ pub(crate) struct CachedTypeInfo {
}
pub struct InnerClient {
sender: mpsc::UnboundedSender<Request>,
sender: mpsc::UnboundedSender<FrontendMessage>,
responses: Responses,
/// A buffer to use when writing out postgres commands.
buffer: Mutex<BytesMut>,
buffer: BytesMut,
}
impl InnerClient {
pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
let (sender, receiver) = mpsc::channel(1);
let request = Request { messages, sender };
self.sender.send(request).map_err(|_| Error::closed())?;
Ok(Responses {
receiver,
cur: BackendMessages::empty(),
})
pub fn send(&mut self, messages: FrontendMessage) -> Result<&mut Responses, Error> {
self.sender.send(messages).map_err(|_| Error::closed())?;
self.responses.waiting += 1;
Ok(&mut self.responses)
}
/// Call the given function with a buffer to be used when writing out
/// postgres commands.
pub fn with_buf<F, R>(&self, f: F) -> R
pub fn with_buf<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
{
let mut buffer = self.buffer.lock();
let r = f(&mut buffer);
buffer.clear();
let r = f(&mut self.buffer);
self.buffer.clear();
r
}
}
@@ -109,7 +125,7 @@ pub struct SocketConfig {
/// The client is one half of what is returned when a connection is established. Users interact with the database
/// through this client object.
pub struct Client {
inner: Arc<InnerClient>,
inner: InnerClient,
cached_typeinfo: CachedTypeInfo,
socket_config: SocketConfig,
@@ -118,19 +134,39 @@ pub struct Client {
secret_key: i32,
}
impl Drop for Client {
fn drop(&mut self) {
if let Some(stmt) = self.cached_typeinfo.typeinfo.take() {
let buf = self.inner.with_buf(|buf| {
frontend::close(b'S', stmt.name(), buf).unwrap();
frontend::sync(buf);
buf.split().freeze()
});
let _ = self.inner.send(FrontendMessage::Raw(buf));
}
}
}
impl Client {
pub(crate) fn new(
sender: mpsc::UnboundedSender<Request>,
sender: mpsc::UnboundedSender<FrontendMessage>,
receiver: mpsc::Receiver<BackendMessages>,
socket_config: SocketConfig,
ssl_mode: SslMode,
process_id: i32,
secret_key: i32,
) -> Client {
Client {
inner: Arc::new(InnerClient {
inner: InnerClient {
sender,
responses: Responses {
receiver,
cur: BackendMessages::empty(),
waiting: 0,
received: 0,
},
buffer: Default::default(),
}),
},
cached_typeinfo: Default::default(),
socket_config,
@@ -145,19 +181,23 @@ impl Client {
self.process_id
}
pub(crate) fn inner(&self) -> &Arc<InnerClient> {
&self.inner
pub(crate) fn inner(&mut self) -> &mut InnerClient {
&mut self.inner
}
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
/// to save a roundtrip
pub async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
pub async fn query_raw_txt<S, I>(
&mut self,
statement: &str,
params: I,
) -> Result<RowStream, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = Option<S>>,
I::IntoIter: ExactSizeIterator,
{
query::query_txt(&self.inner, statement, params).await
query::query_txt(&mut self.inner, statement, params).await
}
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
@@ -173,11 +213,14 @@ impl Client {
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
pub async fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
self.simple_query_raw(query).await?.try_collect().await
}
pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
pub(crate) async fn simple_query_raw(
&mut self,
query: &str,
) -> Result<SimpleQueryStream, Error> {
simple_query::simple_query(self.inner(), query).await
}
@@ -191,7 +234,7 @@ impl Client {
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
pub async fn batch_execute(&mut self, query: &str) -> Result<ReadyForQueryStatus, Error> {
simple_query::batch_execute(self.inner(), query).await
}
@@ -208,7 +251,7 @@ impl Client {
/// The transaction will roll back by default - use the `commit` method to commit it.
pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
struct RollbackIfNotDone<'me> {
client: &'me Client,
client: &'me mut Client,
done: bool,
}
@@ -222,10 +265,7 @@ impl Client {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self
.client
.inner()
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
let _ = self.client.inner().send(FrontendMessage::Raw(buf));
}
}
@@ -239,7 +279,7 @@ impl Client {
client: self,
done: false,
};
self.batch_execute("BEGIN").await?;
cleaner.client.batch_execute("BEGIN").await?;
cleaner.done = true;
}
@@ -267,7 +307,7 @@ impl Client {
/// Query for type information
pub(crate) async fn get_type_inner(&mut self, oid: Oid) -> Result<Type, Error> {
crate::prepare::get_type(&self.inner, &mut self.cached_typeinfo, oid).await
crate::prepare::get_type(&mut self.inner, &mut self.cached_typeinfo, oid).await
}
/// Determines if the connection to the server has already closed.

View File

@@ -1,21 +1,16 @@
use std::io;
use bytes::{Buf, Bytes, BytesMut};
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use postgres_protocol2::message::backend;
use postgres_protocol2::message::frontend::CopyData;
use tokio_util::codec::{Decoder, Encoder};
pub enum FrontendMessage {
Raw(Bytes),
CopyData(CopyData<Box<dyn Buf + Send>>),
}
pub enum BackendMessage {
Normal {
messages: BackendMessages,
request_complete: bool,
},
Normal { messages: BackendMessages },
Async(backend::Message),
}
@@ -44,7 +39,6 @@ impl Encoder<FrontendMessage> for PostgresCodec {
fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
match item {
FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
FrontendMessage::CopyData(data) => data.write(dst),
}
Ok(())
@@ -57,7 +51,6 @@ impl Decoder for PostgresCodec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
let mut idx = 0;
let mut request_complete = false;
while let Some(header) = backend::Header::parse(&src[idx..])? {
let len = header.len() as usize + 1;
@@ -82,7 +75,6 @@ impl Decoder for PostgresCodec {
idx += len;
if header.tag() == backend::READY_FOR_QUERY_TAG {
request_complete = true;
break;
}
}
@@ -92,7 +84,6 @@ impl Decoder for PostgresCodec {
} else {
Ok(Some(BackendMessage::Normal {
messages: BackendMessages(src.split_to(idx)),
request_complete,
}))
}
}

View File

@@ -59,9 +59,11 @@ where
connect_timeout: config.connect_timeout,
};
let (sender, receiver) = mpsc::unbounded_channel();
let (client_tx, conn_rx) = mpsc::unbounded_channel();
let (conn_tx, client_rx) = mpsc::channel(4);
let client = Client::new(
sender,
client_tx,
client_rx,
socket_config,
config.ssl_mode,
process_id,
@@ -74,7 +76,7 @@ where
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
.collect();
let connection = Connection::new(stream, delayed, parameters, receiver);
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
Ok((client, connection))
}

View File

@@ -4,7 +4,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, Stream, ready};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
@@ -19,30 +18,12 @@ use crate::error::DbError;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::{AsyncMessage, Error, Notification};
pub enum RequestMessages {
Single(FrontendMessage),
}
pub struct Request {
pub messages: RequestMessages,
pub sender: mpsc::Sender<BackendMessages>,
}
pub struct Response {
sender: PollSender<BackendMessages>,
}
#[derive(PartialEq, Debug)]
enum State {
Active,
Closing,
}
enum WriteReady {
Terminating,
WaitingOnRead,
}
/// A connection to a PostgreSQL database.
///
/// This is one half of what is returned when a new connection is established. It performs the actual IO with the
@@ -56,9 +37,11 @@ pub struct Connection<S, T> {
pub stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
/// HACK: we need this in the Neon Proxy to forward params.
pub parameters: HashMap<String, String>,
receiver: mpsc::UnboundedReceiver<Request>,
sender: PollSender<BackendMessages>,
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
pending_responses: VecDeque<BackendMessage>,
responses: VecDeque<Response>,
state: State,
}
@@ -71,14 +54,15 @@ where
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
pending_responses: VecDeque<BackendMessage>,
parameters: HashMap<String, String>,
receiver: mpsc::UnboundedReceiver<Request>,
sender: mpsc::Sender<BackendMessages>,
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
) -> Connection<S, T> {
Connection {
stream,
parameters,
sender: PollSender::new(sender),
receiver,
pending_responses,
responses: VecDeque::new(),
state: State::Active,
}
}
@@ -110,7 +94,7 @@ where
}
};
let (mut messages, request_complete) = match message {
let messages = match message {
BackendMessage::Async(Message::NoticeResponse(body)) => {
let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?;
return Poll::Ready(Ok(AsyncMessage::Notice(error)));
@@ -131,41 +115,19 @@ where
continue;
}
BackendMessage::Async(_) => unreachable!(),
BackendMessage::Normal {
messages,
request_complete,
} => (messages, request_complete),
BackendMessage::Normal { messages } => messages,
};
let mut response = match self.responses.pop_front() {
Some(response) => response,
None => match messages.next().map_err(Error::parse)? {
Some(Message::ErrorResponse(error)) => {
return Poll::Ready(Err(Error::db(error)));
}
_ => return Poll::Ready(Err(Error::unexpected_message())),
},
};
match response.sender.poll_reserve(cx) {
match self.sender.poll_reserve(cx) {
Poll::Ready(Ok(())) => {
let _ = response.sender.send_item(messages);
if !request_complete {
self.responses.push_front(response);
}
let _ = self.sender.send_item(messages);
}
Poll::Ready(Err(_)) => {
// we need to keep paging through the rest of the messages even if the receiver's hung up
if !request_complete {
self.responses.push_front(response);
}
return Poll::Ready(Err(Error::closed()));
}
Poll::Pending => {
self.responses.push_front(response);
self.pending_responses.push_back(BackendMessage::Normal {
messages,
request_complete,
});
self.pending_responses
.push_back(BackendMessage::Normal { messages });
trace!("poll_read: waiting on sender");
return Poll::Pending;
}
@@ -174,7 +136,7 @@ where
}
/// Fetch the next client request and enqueue the response sender.
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<RequestMessages>> {
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<FrontendMessage>> {
if self.receiver.is_closed() {
return Poll::Ready(None);
}
@@ -182,10 +144,7 @@ where
match self.receiver.poll_recv(cx) {
Poll::Ready(Some(request)) => {
trace!("polled new request");
self.responses.push_back(Response {
sender: PollSender::new(request.sender),
});
Poll::Ready(Some(request.messages))
Poll::Ready(Some(request))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
@@ -194,7 +153,7 @@ where
/// Process client requests and write them to the postgres connection, flushing if necessary.
/// client -> postgres
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<WriteReady, Error>> {
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
loop {
if Pin::new(&mut self.stream)
.poll_ready(cx)
@@ -209,14 +168,14 @@ where
match self.poll_request(cx) {
// send the message to postgres
Poll::Ready(Some(RequestMessages::Single(request))) => {
Poll::Ready(Some(request)) => {
Pin::new(&mut self.stream)
.start_send(request)
.map_err(Error::io)?;
}
// No more messages from the client, and no more responses to wait for.
// Send a terminate message to postgres
Poll::Ready(None) if self.responses.is_empty() => {
Poll::Ready(None) => {
trace!("poll_write: at eof, terminating");
let mut request = BytesMut::new();
frontend::terminate(&mut request);
@@ -228,16 +187,7 @@ where
trace!("poll_write: sent eof, closing");
trace!("poll_write: done");
return Poll::Ready(Ok(WriteReady::Terminating));
}
// No more messages from the client, but there are still some responses to wait for.
Poll::Ready(None) => {
trace!(
"poll_write: at eof, pending responses {}",
self.responses.len()
);
ready!(self.poll_flush(cx))?;
return Poll::Ready(Ok(WriteReady::WaitingOnRead));
return Poll::Ready(Ok(()));
}
// Still waiting for a message from the client.
Poll::Pending => {
@@ -298,7 +248,7 @@ where
// if the state is still active, try read from and write to postgres.
let message = self.poll_read(cx)?;
let closing = self.poll_write(cx)?;
if let Poll::Ready(WriteReady::Terminating) = closing {
if let Poll::Ready(()) = closing {
self.state = State::Closing;
}

View File

@@ -15,7 +15,7 @@ mod private {
/// This trait is "sealed", and cannot be implemented outside of this crate.
pub trait GenericClient: private::Sealed {
/// Like `Client::query_raw_txt`.
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,
@@ -28,7 +28,7 @@ pub trait GenericClient: private::Sealed {
impl private::Sealed for Client {}
impl GenericClient for Client {
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,
@@ -46,7 +46,7 @@ impl GenericClient for Client {
impl private::Sealed for Transaction<'_> {}
impl GenericClient for Transaction<'_> {
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,

View File

@@ -1,6 +1,5 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
@@ -11,7 +10,6 @@ use tracing::debug;
use crate::client::{CachedTypeInfo, InnerClient};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::{Kind, Oid, Type};
use crate::{Column, Error, Statement, query, slice_iter};
@@ -24,13 +22,13 @@ WHERE t.oid = $1
";
async fn prepare_typecheck(
client: &Arc<InnerClient>,
client: &mut InnerClient,
name: &'static str,
query: &str,
types: &[Type],
) -> Result<Statement, Error> {
let buf = encode(client, name, query, types)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::ParseComplete => {}
@@ -65,10 +63,15 @@ async fn prepare_typecheck(
}
}
Ok(Statement::new(client, name, parameters, columns))
Ok(Statement::new(name, parameters, columns))
}
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
fn encode(
client: &mut InnerClient,
name: &str,
query: &str,
types: &[Type],
) -> Result<Bytes, Error> {
if types.is_empty() {
debug!("preparing query {}: {}", name, query);
} else {
@@ -84,7 +87,7 @@ fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Resu
}
pub async fn get_type(
client: &Arc<InnerClient>,
client: &mut InnerClient,
typecache: &mut CachedTypeInfo,
oid: Oid,
) -> Result<Type, Error> {
@@ -139,7 +142,7 @@ pub async fn get_type(
}
fn get_type_rec<'a>(
client: &'a Arc<InnerClient>,
client: &'a mut InnerClient,
typecache: &'a mut CachedTypeInfo,
oid: Oid,
) -> Pin<Box<dyn Future<Output = Result<Type, Error>> + Send + 'a>> {
@@ -147,7 +150,7 @@ fn get_type_rec<'a>(
}
async fn typeinfo_statement(
client: &Arc<InnerClient>,
client: &mut InnerClient,
typecache: &mut CachedTypeInfo,
) -> Result<Statement, Error> {
if let Some(stmt) = &typecache.typeinfo {

View File

@@ -1,13 +1,10 @@
use std::fmt;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::{Format, ToSql, Type};
@@ -15,7 +12,6 @@ use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::IsNull;
use crate::{Column, Error, ReadyForQueryStatus, Row, Statement};
@@ -28,7 +24,7 @@ impl fmt::Debug for BorrowToSqlParamsDebug<'_> {
}
pub async fn query<'a, I>(
client: &InnerClient,
client: &mut InnerClient,
statement: Statement,
params: I,
) -> Result<RowStream, Error>
@@ -49,20 +45,19 @@ where
};
let responses = start(client, buf).await?;
Ok(RowStream {
statement,
responses,
statement,
command_tag: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Binary,
_p: PhantomPinned,
})
}
pub async fn query_txt<S, I>(
client: &Arc<InnerClient>,
pub async fn query_txt<'a, S, I>(
client: &'a mut InnerClient,
query: &str,
params: I,
) -> Result<RowStream, Error>
) -> Result<RowStream<'a>, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = Option<S>>,
@@ -109,7 +104,7 @@ where
})?;
// now read the responses
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::ParseComplete => {}
@@ -150,17 +145,16 @@ where
}
Ok(RowStream {
statement: Statement::new_anonymous(parameters, columns),
responses,
statement: Statement::new_anonymous(parameters, columns),
command_tag: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Text,
_p: PhantomPinned,
})
}
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
async fn start(client: &mut InnerClient, buf: Bytes) -> Result<&mut Responses, Error> {
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::BindComplete => {}
@@ -170,7 +164,11 @@ async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
Ok(responses)
}
pub fn encode<'a, I>(client: &InnerClient, statement: &Statement, params: I) -> Result<Bytes, Error>
pub fn encode<'a, I>(
client: &mut InnerClient,
statement: &Statement,
params: I,
) -> Result<Bytes, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
@@ -234,41 +232,37 @@ where
}
}
pin_project! {
/// A stream of table rows.
pub struct RowStream {
statement: Statement,
responses: Responses,
command_tag: Option<String>,
output_format: Format,
status: ReadyForQueryStatus,
#[pin]
_p: PhantomPinned,
}
/// A stream of table rows.
pub struct RowStream<'a> {
responses: &'a mut Responses,
output_format: Format,
pub statement: Statement,
pub command_tag: Option<String>,
pub status: ReadyForQueryStatus,
}
impl Stream for RowStream {
impl Stream for RowStream<'_> {
type Item = Result<Row, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let this = self.get_mut();
loop {
match ready!(this.responses.poll_next(cx)?) {
Message::DataRow(body) => {
return Poll::Ready(Some(Ok(Row::new(
this.statement.clone(),
body,
*this.output_format,
this.output_format,
)?)));
}
Message::EmptyQueryResponse | Message::PortalSuspended => {}
Message::CommandComplete(body) => {
if let Ok(tag) = body.tag() {
*this.command_tag = Some(tag.to_string());
this.command_tag = Some(tag.to_string());
}
}
Message::ReadyForQuery(status) => {
*this.status = status.into();
this.status = status.into();
return Poll::Ready(None);
}
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
@@ -276,24 +270,3 @@ impl Stream for RowStream {
}
}
}
impl RowStream {
/// Returns information about the columns of data in the row.
pub fn columns(&self) -> &[Column] {
self.statement.columns()
}
/// Returns the command tag of this query.
///
/// This is only available after the stream has been exhausted.
pub fn command_tag(&self) -> Option<String> {
self.command_tag.clone()
}
/// Returns if the connection is ready for querying, with the status of the connection.
///
/// This might be available only after the stream has been exhausted.
pub fn ready_status(&self) -> ReadyForQueryStatus {
self.status
}
}

View File

@@ -1,4 +1,3 @@
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -13,7 +12,6 @@ use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
/// Information about a column of a single query row.
@@ -33,28 +31,30 @@ impl SimpleColumn {
}
}
pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
pub async fn simple_query<'a>(
client: &'a mut InnerClient,
query: &str,
) -> Result<SimpleQueryStream<'a>, Error> {
debug!("executing simple query: {}", query);
let buf = encode(client, query)?;
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
let responses = client.send(FrontendMessage::Raw(buf))?;
Ok(SimpleQueryStream {
responses,
columns: None,
status: ReadyForQueryStatus::Unknown,
_p: PhantomPinned,
})
}
pub async fn batch_execute(
client: &InnerClient,
client: &mut InnerClient,
query: &str,
) -> Result<ReadyForQueryStatus, Error> {
debug!("executing statement batch: {}", query);
let buf = encode(client, query)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
let responses = client.send(FrontendMessage::Raw(buf))?;
loop {
match responses.next().await? {
@@ -68,7 +68,7 @@ pub async fn batch_execute(
}
}
pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
pub(crate) fn encode(client: &mut InnerClient, query: &str) -> Result<Bytes, Error> {
client.with_buf(|buf| {
frontend::query(query, buf).map_err(Error::encode)?;
Ok(buf.split().freeze())
@@ -77,16 +77,14 @@ pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error>
pin_project! {
/// A stream of simple query results.
pub struct SimpleQueryStream {
responses: Responses,
pub struct SimpleQueryStream<'a> {
responses: &'a mut Responses,
columns: Option<Arc<[SimpleColumn]>>,
status: ReadyForQueryStatus,
#[pin]
_p: PhantomPinned,
}
}
impl SimpleQueryStream {
impl SimpleQueryStream<'_> {
/// Returns if the connection is ready for querying, with the status of the connection.
///
/// This might be available only after the stream has been exhausted.
@@ -95,7 +93,7 @@ impl SimpleQueryStream {
}
}
impl Stream for SimpleQueryStream {
impl Stream for SimpleQueryStream<'_> {
type Item = Result<SimpleQueryMessage, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View File

@@ -1,35 +1,16 @@
use std::fmt;
use std::sync::{Arc, Weak};
use std::sync::Arc;
use crate::types::Type;
use postgres_protocol2::Oid;
use postgres_protocol2::message::backend::Field;
use postgres_protocol2::message::frontend;
use crate::client::InnerClient;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::Type;
struct StatementInner {
client: Weak<InnerClient>,
name: &'static str,
params: Vec<Type>,
columns: Vec<Column>,
}
impl Drop for StatementInner {
fn drop(&mut self) {
if let Some(client) = self.client.upgrade() {
let buf = client.with_buf(|buf| {
frontend::close(b'S', self.name, buf).unwrap();
frontend::sync(buf);
buf.split().freeze()
});
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
}
}
}
/// A prepared statement.
///
/// Prepared statements can only be used with the connection that created them.
@@ -37,14 +18,8 @@ impl Drop for StatementInner {
pub struct Statement(Arc<StatementInner>);
impl Statement {
pub(crate) fn new(
inner: &Arc<InnerClient>,
name: &'static str,
params: Vec<Type>,
columns: Vec<Column>,
) -> Statement {
pub(crate) fn new(name: &'static str, params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
client: Arc::downgrade(inner),
name,
params,
columns,
@@ -53,7 +28,6 @@ impl Statement {
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
client: Weak::new(),
name: "<anonymous>",
params,
columns,

View File

@@ -1,7 +1,6 @@
use postgres_protocol2::message::frontend;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::query::RowStream;
use crate::{CancelToken, Client, Error, ReadyForQueryStatus};
@@ -24,10 +23,7 @@ impl Drop for Transaction<'_> {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self
.client
.inner()
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
let _ = self.client.inner().send(FrontendMessage::Raw(buf));
}
}
@@ -54,7 +50,11 @@ impl<'a> Transaction<'a> {
}
/// Like `Client::query_raw_txt`.
pub async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
pub async fn query_raw_txt<S, I>(
&mut self,
statement: &str,
params: I,
) -> Result<RowStream, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = Option<S>>,

View File

@@ -14,7 +14,9 @@ use hyper::http::{HeaderName, HeaderValue};
use hyper::{HeaderMap, Request, Response, StatusCode, header};
use indexmap::IndexMap;
use postgres_client::error::{DbError, ErrorPosition, SqlState};
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use postgres_client::{
GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction,
};
use pq_proto::StartupMessageParamsBuilder;
use serde::Serialize;
use serde_json::Value;
@@ -1092,12 +1094,10 @@ async fn query_to_json<T: GenericClient>(
let query_start = Instant::now();
let query_params = data.params;
let mut row_stream = std::pin::pin!(
client
.query_raw_txt(&data.query, query_params)
.await
.map_err(SqlOverHttpError::Postgres)?
);
let mut row_stream = client
.query_raw_txt(&data.query, query_params)
.await
.map_err(SqlOverHttpError::Postgres)?;
let query_acknowledged = Instant::now();
// Manually drain the stream into a vector to leave row_stream hanging
@@ -1118,10 +1118,15 @@ async fn query_to_json<T: GenericClient>(
}
let query_resp_end = Instant::now();
let ready = row_stream.ready_status();
let RowStream {
statement,
command_tag,
status: ready,
..
} = row_stream;
// grab the command tag and number of rows affected
let command_tag = row_stream.command_tag().unwrap_or_default();
let command_tag = command_tag.unwrap_or_default();
let mut command_tag_split = command_tag.split(' ');
let command_tag_name = command_tag_split.next().unwrap_or_default();
let command_tag_count = if command_tag_name == "INSERT" {
@@ -1142,11 +1147,11 @@ async fn query_to_json<T: GenericClient>(
"finished executing query"
);
let columns_len = row_stream.columns().len();
let columns_len = statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut columns = Vec::with_capacity(columns_len);
for c in row_stream.columns() {
for c in statement.columns() {
fields.push(json!({
"name": c.name().to_owned(),
"dataTypeID": c.type_().oid(),