diff --git a/libs/proxy/tokio-postgres2/src/lib.rs b/libs/proxy/tokio-postgres2/src/lib.rs index e2a57a23c5..1673bd1844 100644 --- a/libs/proxy/tokio-postgres2/src/lib.rs +++ b/libs/proxy/tokio-postgres2/src/lib.rs @@ -10,8 +10,7 @@ use crate::error::DbError; pub use crate::error::Error; pub use crate::generic_client::GenericClient; pub use crate::query::RowStream; -pub use crate::row::{Row, SimpleQueryRow}; -pub use crate::simple_query::SimpleQueryStream; +pub use crate::row::Row; pub use crate::statement::{Column, Statement}; pub use crate::tls::NoTls; // pub use crate::to_statement::ToStatement; @@ -98,7 +97,6 @@ impl Notification { /// An asynchronous message from the server. #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone)] -#[non_exhaustive] pub enum AsyncMessage { /// A notice. /// @@ -110,18 +108,6 @@ pub enum AsyncMessage { Notification(Notification), } -/// Message returned by the `SimpleQuery` stream. -#[derive(Debug)] -#[non_exhaustive] -pub enum SimpleQueryMessage { - /// A row of data. - Row(SimpleQueryRow), - /// A statement in the query has completed. - /// - /// The number of rows modified or selected is returned. - CommandComplete(u64), -} - fn slice_iter<'a>( s: &'a [&'a (dyn ToSql + Sync)], ) -> impl ExactSizeIterator + 'a { diff --git a/libs/proxy/tokio-postgres2/src/row.rs b/libs/proxy/tokio-postgres2/src/row.rs index 10e130707d..b8c5e84b4f 100644 --- a/libs/proxy/tokio-postgres2/src/row.rs +++ b/libs/proxy/tokio-postgres2/src/row.rs @@ -1,7 +1,4 @@ //! Rows. - -use crate::row::sealed::{AsName, Sealed}; -use crate::simple_query::SimpleColumn; use crate::statement::Column; use crate::types::{FromSql, Type, WrongType}; use crate::{Error, Statement}; @@ -11,89 +8,6 @@ use postgres_types2::{Format, WrongFormat}; use std::fmt; use std::ops::Range; use std::str; -use std::sync::Arc; - -mod sealed { - pub trait Sealed {} - - pub trait AsName { - fn as_name(&self) -> &str; - } -} - -impl AsName for Column { - fn as_name(&self) -> &str { - self.name() - } -} - -impl AsName for String { - fn as_name(&self) -> &str { - self - } -} - -/// A trait implemented by types that can index into columns of a row. -/// -/// This cannot be implemented outside of this crate. -pub trait RowIndex: Sealed { - #[doc(hidden)] - fn __idx(&self, columns: &[T]) -> Option - where - T: AsName; -} - -impl Sealed for usize {} - -impl RowIndex for usize { - #[inline] - fn __idx(&self, columns: &[T]) -> Option - where - T: AsName, - { - if *self >= columns.len() { - None - } else { - Some(*self) - } - } -} - -impl Sealed for str {} - -impl RowIndex for str { - #[inline] - fn __idx(&self, columns: &[T]) -> Option - where - T: AsName, - { - if let Some(idx) = columns.iter().position(|d| d.as_name() == self) { - return Some(idx); - }; - - // FIXME ASCII-only case insensitivity isn't really the right thing to - // do. Postgres itself uses a dubious wrapper around tolower and JDBC - // uses the US locale. - columns - .iter() - .position(|d| d.as_name().eq_ignore_ascii_case(self)) - } -} - -impl Sealed for &T where T: ?Sized + Sealed {} - -impl RowIndex for &T -where - T: ?Sized + RowIndex, -{ - #[inline] - fn __idx(&self, columns: &[U]) -> Option - where - U: AsName, - { - T::__idx(*self, columns) - } -} /// A row of data returned from the database by a query. pub struct Row { @@ -148,37 +62,33 @@ impl Row { /// # Panics /// /// Panics if the index is out of bounds or if the value cannot be converted to the specified type. - pub fn get<'a, I, T>(&'a self, idx: I) -> T + pub fn get<'a, T>(&'a self, idx: usize) -> T where - I: RowIndex + fmt::Display, T: FromSql<'a>, { - match self.get_inner(&idx) { + match self.get_inner(idx) { Ok(ok) => ok, Err(err) => panic!("error retrieving column {}: {}", idx, err), } } /// Like `Row::get`, but returns a `Result` rather than panicking. - pub fn try_get<'a, I, T>(&'a self, idx: I) -> Result + pub fn try_get<'a, T>(&'a self, idx: usize) -> Result where - I: RowIndex + fmt::Display, T: FromSql<'a>, { - self.get_inner(&idx) + self.get_inner(idx) } - fn get_inner<'a, I, T>(&'a self, idx: &I) -> Result + fn get_inner<'a, T>(&'a self, idx: usize) -> Result where - I: RowIndex + fmt::Display, T: FromSql<'a>, { - let idx = match idx.__idx(self.columns()) { - Some(idx) => idx, - None => return Err(Error::column(idx.to_string())), + let Some(column) = self.columns().get(idx) else { + return Err(Error::column(idx.to_string())); }; - let ty = self.columns()[idx].type_(); + let ty = column.type_(); if !T::accepts(ty) { return Err(Error::from_sql( Box::new(WrongType::new::(ty.clone())), @@ -216,85 +126,3 @@ impl Row { self.body.buffer().len() } } - -impl AsName for SimpleColumn { - fn as_name(&self) -> &str { - self.name() - } -} - -/// A row of data returned from the database by a simple query. -#[derive(Debug)] -pub struct SimpleQueryRow { - columns: Arc<[SimpleColumn]>, - body: DataRowBody, - ranges: Vec>>, -} - -impl SimpleQueryRow { - #[allow(clippy::new_ret_no_self)] - pub(crate) fn new( - columns: Arc<[SimpleColumn]>, - body: DataRowBody, - ) -> Result { - let ranges = body.ranges().collect().map_err(Error::parse)?; - Ok(SimpleQueryRow { - columns, - body, - ranges, - }) - } - - /// Returns information about the columns of data in the row. - pub fn columns(&self) -> &[SimpleColumn] { - &self.columns - } - - /// Determines if the row contains no values. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Returns the number of values in the row. - pub fn len(&self) -> usize { - self.columns.len() - } - - /// Returns a value from the row. - /// - /// The value can be specified either by its numeric index in the row, or by its column name. - /// - /// # Panics - /// - /// Panics if the index is out of bounds or if the value cannot be converted to the specified type. - pub fn get(&self, idx: I) -> Option<&str> - where - I: RowIndex + fmt::Display, - { - match self.get_inner(&idx) { - Ok(ok) => ok, - Err(err) => panic!("error retrieving column {}: {}", idx, err), - } - } - - /// Like `SimpleQueryRow::get`, but returns a `Result` rather than panicking. - pub fn try_get(&self, idx: I) -> Result, Error> - where - I: RowIndex + fmt::Display, - { - self.get_inner(&idx) - } - - fn get_inner(&self, idx: &I) -> Result, Error> - where - I: RowIndex + fmt::Display, - { - let idx = match idx.__idx(&self.columns) { - Some(idx) => idx, - None => return Err(Error::column(idx.to_string())), - }; - - let buf = self.ranges[idx].clone().map(|r| &self.body.buffer()[r]); - FromSql::from_sql_nullable(&Type::TEXT, buf).map_err(|e| Error::from_sql(e, idx)) - } -} diff --git a/libs/proxy/tokio-postgres2/src/simple_query.rs b/libs/proxy/tokio-postgres2/src/simple_query.rs index d8d86cf902..9e9e455a87 100644 --- a/libs/proxy/tokio-postgres2/src/simple_query.rs +++ b/libs/proxy/tokio-postgres2/src/simple_query.rs @@ -1,35 +1,11 @@ -use crate::client::{InnerClient, Responses}; +use crate::client::InnerClient; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; -use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow}; +use crate::{Error, ReadyForQueryStatus}; use bytes::Bytes; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Stream}; use log::debug; -use pin_project_lite::pin_project; use postgres_protocol2::message::backend::Message; use postgres_protocol2::message::frontend; -use std::marker::PhantomPinned; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -/// Information about a column of a single query row. -#[derive(Debug)] -pub struct SimpleColumn { - name: String, -} - -impl SimpleColumn { - pub(crate) fn new(name: String) -> SimpleColumn { - SimpleColumn { name } - } - - /// Returns the name of the column. - pub fn name(&self) -> &str { - &self.name - } -} pub async fn batch_execute( client: &mut InnerClient, @@ -58,71 +34,3 @@ pub(crate) fn encode(client: &mut InnerClient, query: &str) -> Result>, - status: ReadyForQueryStatus, - #[pin] - _p: PhantomPinned, - } -} - -impl SimpleQueryStream { - /// Returns if the connection is ready for querying, with the status of the connection. - /// - /// This might be available only after the stream has been exhausted. - pub fn ready_status(&self) -> ReadyForQueryStatus { - self.status - } -} - -impl Stream for SimpleQueryStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - loop { - match ready!(this.responses.poll_next(cx)?) { - Message::CommandComplete(body) => { - let rows = body - .tag() - .map_err(Error::parse)? - .rsplit(' ') - .next() - .unwrap() - .parse() - .unwrap_or(0); - return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows)))); - } - Message::EmptyQueryResponse => { - return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0)))); - } - Message::RowDescription(body) => { - let columns = body - .fields() - .map(|f| Ok(SimpleColumn::new(f.name().to_string()))) - .collect::>() - .map_err(Error::parse)? - .into(); - - *this.columns = Some(columns); - } - Message::DataRow(body) => { - let row = match &this.columns { - Some(columns) => SimpleQueryRow::new(columns.clone(), body)?, - None => return Poll::Ready(Some(Err(Error::unexpected_message()))), - }; - return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row)))); - } - Message::ReadyForQuery(s) => { - *this.status = s.into(); - return Poll::Ready(None); - } - _ => return Poll::Ready(Some(Err(Error::unexpected_message()))), - } - } - } -} diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index cac5a173cb..95e86d7b84 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -11,7 +11,7 @@ use smallvec::SmallVec; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{debug, error, info, info_span, Instrument}; #[cfg(test)] use { super::conn_pool_lib::GlobalConnPoolOptions, @@ -125,13 +125,10 @@ pub(crate) fn poll_client( match message { Some(Ok(AsyncMessage::Notice(notice))) => { - info!(%session_id, "notice: {}", notice); + debug!(%session_id, "notice: {}", notice); } Some(Ok(AsyncMessage::Notification(notif))) => { - warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received"); - } - Some(Ok(_)) => { - warn!(%session_id, "unknown message"); + debug!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received"); } Some(Err(e)) => { error!(%session_id, "connection error: {}", e); diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index ac7178131b..54096ccf98 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -29,7 +29,7 @@ use signature::Signer; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, info_span, warn, Instrument}; +use tracing::{debug, error, info, info_span, Instrument}; use super::backend::HttpConnError; use super::conn_pool_lib::{ @@ -228,13 +228,10 @@ pub(crate) fn poll_client( match message { Some(Ok(AsyncMessage::Notice(notice))) => { - info!(%session_id, "notice: {}", notice); + debug!(%session_id, "notice: {}", notice); } Some(Ok(AsyncMessage::Notification(notif))) => { - warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received"); - } - Some(Ok(_)) => { - warn!(%session_id, "unknown message"); + debug!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received"); } Some(Err(e)) => { error!(%session_id, "connection error: {}", e);