Compare commits

...

4 Commits

5 changed files with 10 additions and 36 deletions

View File

@@ -1,11 +1,9 @@
use crate::client::SocketConfig;
use crate::codec::BackendMessage;
use crate::config::Host;
use crate::connect_raw::connect_raw;
use crate::connect_socket::connect_socket;
use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
use postgres_protocol2::message::backend::Message;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
@@ -43,7 +41,7 @@ where
let RawConnection {
stream,
parameters,
delayed_notice,
delayed_notice: _,
process_id,
secret_key,
} = connect_raw(socket, tls, config).await?;
@@ -63,13 +61,7 @@ where
secret_key,
);
// delayed notices are always sent as "Async" messages.
let delayed = delayed_notice
.into_iter()
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
.collect();
let connection = Connection::new(stream, delayed, parameters, receiver);
let connection = Connection::new(stream, parameters, receiver);
Ok((client, connection))
}

View File

@@ -1,11 +1,10 @@
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::error::DbError;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::{AsyncMessage, Error, Notification};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Sink, Stream};
use log::{info, trace};
use log::trace;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use std::collections::{HashMap, VecDeque};
@@ -55,7 +54,7 @@ pub struct Connection<S, T> {
/// HACK: we need this in the Neon Proxy to forward params.
pub parameters: HashMap<String, String>,
receiver: mpsc::UnboundedReceiver<Request>,
pending_responses: VecDeque<BackendMessage>,
pending_responses: Option<BackendMessage>,
responses: VecDeque<Response>,
state: State,
}
@@ -67,7 +66,6 @@ where
{
pub(crate) fn new(
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
pending_responses: VecDeque<BackendMessage>,
parameters: HashMap<String, String>,
receiver: mpsc::UnboundedReceiver<Request>,
) -> Connection<S, T> {
@@ -75,7 +73,7 @@ where
stream,
parameters,
receiver,
pending_responses,
pending_responses: None,
responses: VecDeque::new(),
state: State::Active,
}
@@ -85,7 +83,7 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<BackendMessage, Error>>> {
if let Some(message) = self.pending_responses.pop_front() {
if let Some(message) = self.pending_responses.take() {
trace!("retrying pending response");
return Poll::Ready(Some(Ok(message)));
}
@@ -109,9 +107,8 @@ where
};
let (mut messages, request_complete) = match message {
BackendMessage::Async(Message::NoticeResponse(body)) => {
let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?;
return Poll::Ready(Ok(AsyncMessage::Notice(error)));
BackendMessage::Async(Message::NoticeResponse(_)) => {
continue;
}
BackendMessage::Async(Message::NotificationResponse(body)) => {
let notification = Notification {
@@ -160,7 +157,7 @@ where
}
Poll::Pending => {
self.responses.push_front(response);
self.pending_responses.push_back(BackendMessage::Normal {
self.pending_responses = Some(BackendMessage::Normal {
messages,
request_complete,
});
@@ -328,11 +325,7 @@ where
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
while let Some(message) = ready!(self.poll_message(cx)?) {
if let AsyncMessage::Notice(notice) = message {
info!("{}: {}", notice.severity(), notice.message());
}
}
while ready!(self.poll_message(cx)?).is_some() {}
Poll::Ready(Ok(()))
}
}

View File

@@ -6,7 +6,6 @@ pub use crate::client::{Client, SocketConfig};
pub use crate::config::Config;
pub use crate::connect_raw::RawConnection;
pub use crate::connection::Connection;
use crate::error::DbError;
pub use crate::error::Error;
pub use crate::generic_client::GenericClient;
pub use crate::query::RowStream;
@@ -100,10 +99,6 @@ impl Notification {
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum AsyncMessage {
/// A notice.
///
/// Notices use the same format as errors, but aren't "errors" per-se.
Notice(DbError),
/// A notification.
///
/// Connections can subscribe to notifications with the `LISTEN` command.

View File

@@ -124,9 +124,6 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session_id, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
}

View File

@@ -227,9 +227,6 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session_id, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
}