From 3204efc860bcd6e849733cc7759b6742e6df8d8e Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 17 Feb 2025 16:19:57 +0000 Subject: [PATCH] chore(proxy): use specially named prepared statements for type-checking (#10843) I was looking into https://github.com/neondatabase/serverless/issues/144, I recall previous cases where proxy would trigger these prepared statements which would conflict with other statements prepared by our client downstream. Because of that, and also to aid in debugging, I've made sure all prepared statements that proxy needs to make have specific names that likely won't conflict and makes it clear in a error log if it's our statements that are causing issues --- libs/proxy/tokio-postgres2/src/client.rs | 98 +++---------------- .../tokio-postgres2/src/generic_client.rs | 9 +- libs/proxy/tokio-postgres2/src/lib.rs | 2 - libs/proxy/tokio-postgres2/src/prepare.rs | 48 ++------- libs/proxy/tokio-postgres2/src/query.rs | 43 -------- libs/proxy/tokio-postgres2/src/statement.rs | 10 +- .../proxy/tokio-postgres2/src/to_statement.rs | 57 ----------- proxy/src/serverless/backend.rs | 2 +- proxy/src/serverless/local_conn_pool.rs | 11 +-- 9 files changed, 36 insertions(+), 244 deletions(-) delete mode 100644 libs/proxy/tokio-postgres2/src/to_statement.rs diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index 9bbbd4c260..46151ab924 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -10,8 +10,8 @@ use crate::simple_query::SimpleQueryStream; use crate::types::{Oid, ToSql, Type}; use crate::{ - prepare, query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row, - SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder, + query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row, + SimpleQueryMessage, Statement, Transaction, TransactionBuilder, }; use bytes::BytesMut; use fallible_iterator::FallibleIterator; @@ -54,18 +54,18 @@ impl Responses { } /// A cache of type info and prepared statements for fetching type info -/// (corresponding to the queries in the [prepare] module). +/// (corresponding to the queries in the [crate::prepare] module). #[derive(Default)] struct CachedTypeInfo { /// A statement for basic information for a type from its - /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its + /// OID. Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_QUERY) (or its /// fallback). typeinfo: Option, /// A statement for getting information for a composite type from its OID. - /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY). + /// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY). typeinfo_composite: Option, /// A statement for getting information for a composite type from its OID. - /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or + /// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY) (or /// its fallback). typeinfo_enum: Option, @@ -190,26 +190,6 @@ impl Client { &self.inner } - /// Creates a new prepared statement. - /// - /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc), - /// which are set when executed. Prepared statements can only be used with the connection that created them. - pub async fn prepare(&self, query: &str) -> Result { - self.prepare_typed(query, &[]).await - } - - /// Like `prepare`, but allows the types of query parameters to be explicitly specified. - /// - /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be - /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`. - pub async fn prepare_typed( - &self, - query: &str, - parameter_types: &[Type], - ) -> Result { - prepare::prepare(&self.inner, query, parameter_types).await - } - /// Executes a statement, returning a vector of the resulting rows. /// /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list @@ -222,14 +202,11 @@ impl Client { /// # Panics /// /// Panics if the number of parameters provided does not match the number expected. - pub async fn query( + pub async fn query( &self, - statement: &T, + statement: Statement, params: &[&(dyn ToSql + Sync)], - ) -> Result, Error> - where - T: ?Sized + ToStatement, - { + ) -> Result, Error> { self.query_raw(statement, slice_iter(params)) .await? .try_collect() @@ -250,13 +227,15 @@ impl Client { /// Panics if the number of parameters provided does not match the number expected. /// /// [`query`]: #method.query - pub async fn query_raw<'a, T, I>(&self, statement: &T, params: I) -> Result + pub async fn query_raw<'a, I>( + &self, + statement: Statement, + params: I, + ) -> Result where - T: ?Sized + ToStatement, I: IntoIterator, I::IntoIter: ExactSizeIterator, { - let statement = statement.__convert().into_statement(self).await?; query::query(&self.inner, statement, params).await } @@ -271,55 +250,6 @@ impl Client { query::query_txt(&self.inner, statement, params).await } - /// Executes a statement, returning the number of rows modified. - /// - /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list - /// provided, 1-indexed. - /// - /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be - /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front - /// with the `prepare` method. - /// - /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned. - /// - /// # Panics - /// - /// Panics if the number of parameters provided does not match the number expected. - pub async fn execute( - &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> Result - where - T: ?Sized + ToStatement, - { - self.execute_raw(statement, slice_iter(params)).await - } - - /// The maximally flexible version of [`execute`]. - /// - /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list - /// provided, 1-indexed. - /// - /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be - /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front - /// with the `prepare` method. - /// - /// # Panics - /// - /// Panics if the number of parameters provided does not match the number expected. - /// - /// [`execute`]: #method.execute - pub async fn execute_raw<'a, T, I>(&self, statement: &T, params: I) -> Result - where - T: ?Sized + ToStatement, - I: IntoIterator, - I::IntoIter: ExactSizeIterator, - { - let statement = statement.__convert().into_statement(self).await?; - query::execute(self.inner(), statement, params).await - } - /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows. /// /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that diff --git a/libs/proxy/tokio-postgres2/src/generic_client.rs b/libs/proxy/tokio-postgres2/src/generic_client.rs index 768213f8ed..042b5a675e 100644 --- a/libs/proxy/tokio-postgres2/src/generic_client.rs +++ b/libs/proxy/tokio-postgres2/src/generic_client.rs @@ -1,7 +1,8 @@ +#![allow(async_fn_in_trait)] + use crate::query::RowStream; use crate::types::Type; use crate::{Client, Error, Transaction}; -use async_trait::async_trait; use postgres_protocol2::Oid; mod private { @@ -11,7 +12,6 @@ mod private { /// A trait allowing abstraction over connections and transactions. /// /// This trait is "sealed", and cannot be implemented outside of this crate. -#[async_trait] pub trait GenericClient: private::Sealed { /// Like `Client::query_raw_txt`. async fn query_raw_txt(&self, statement: &str, params: I) -> Result @@ -26,7 +26,6 @@ pub trait GenericClient: private::Sealed { impl private::Sealed for Client {} -#[async_trait] impl GenericClient for Client { async fn query_raw_txt(&self, statement: &str, params: I) -> Result where @@ -39,14 +38,12 @@ impl GenericClient for Client { /// Query for type information async fn get_type(&self, oid: Oid) -> Result { - self.get_type(oid).await + crate::prepare::get_type(self.inner(), oid).await } } impl private::Sealed for Transaction<'_> {} -#[async_trait] -#[allow(clippy::needless_lifetimes)] impl GenericClient for Transaction<'_> { async fn query_raw_txt(&self, statement: &str, params: I) -> Result where diff --git a/libs/proxy/tokio-postgres2/src/lib.rs b/libs/proxy/tokio-postgres2/src/lib.rs index 9155dd8279..7426279167 100644 --- a/libs/proxy/tokio-postgres2/src/lib.rs +++ b/libs/proxy/tokio-postgres2/src/lib.rs @@ -14,7 +14,6 @@ pub use crate::row::{Row, SimpleQueryRow}; pub use crate::simple_query::SimpleQueryStream; pub use crate::statement::{Column, Statement}; pub use crate::tls::NoTls; -pub use crate::to_statement::ToStatement; pub use crate::transaction::Transaction; pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder}; use crate::types::ToSql; @@ -65,7 +64,6 @@ pub mod row; mod simple_query; mod statement; pub mod tls; -mod to_statement; mod transaction; mod transaction_builder; pub mod types; diff --git a/libs/proxy/tokio-postgres2/src/prepare.rs b/libs/proxy/tokio-postgres2/src/prepare.rs index da0c755c5b..58bbb26cbc 100644 --- a/libs/proxy/tokio-postgres2/src/prepare.rs +++ b/libs/proxy/tokio-postgres2/src/prepare.rs @@ -1,7 +1,6 @@ use crate::client::InnerClient; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; -use crate::error::SqlState; use crate::types::{Field, Kind, Oid, Type}; use crate::{query, slice_iter}; use crate::{Column, Error, Statement}; @@ -13,7 +12,6 @@ use postgres_protocol2::message::backend::Message; use postgres_protocol2::message::frontend; use std::future::Future; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; pub(crate) const TYPEINFO_QUERY: &str = "\ @@ -24,14 +22,6 @@ INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid WHERE t.oid = $1 "; -// Range types weren't added until Postgres 9.2, so pg_range may not exist -const TYPEINFO_FALLBACK_QUERY: &str = "\ -SELECT t.typname, t.typtype, t.typelem, NULL::OID, t.typbasetype, n.nspname, t.typrelid -FROM pg_catalog.pg_type t -INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid -WHERE t.oid = $1 -"; - const TYPEINFO_ENUM_QUERY: &str = "\ SELECT enumlabel FROM pg_catalog.pg_enum @@ -39,14 +29,6 @@ WHERE enumtypid = $1 ORDER BY enumsortorder "; -// Postgres 9.0 didn't have enumsortorder -const TYPEINFO_ENUM_FALLBACK_QUERY: &str = "\ -SELECT enumlabel -FROM pg_catalog.pg_enum -WHERE enumtypid = $1 -ORDER BY oid -"; - pub(crate) const TYPEINFO_COMPOSITE_QUERY: &str = "\ SELECT attname, atttypid FROM pg_catalog.pg_attribute @@ -56,15 +38,13 @@ AND attnum > 0 ORDER BY attnum "; -static NEXT_ID: AtomicUsize = AtomicUsize::new(0); - pub async fn prepare( client: &Arc, + name: &'static str, query: &str, types: &[Type], ) -> Result { - let name = format!("s{}", NEXT_ID.fetch_add(1, Ordering::SeqCst)); - let buf = encode(client, &name, query, types)?; + let buf = encode(client, name, query, types)?; let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?; match responses.next().await? { @@ -105,10 +85,11 @@ pub async fn prepare( fn prepare_rec<'a>( client: &'a Arc, + name: &'static str, query: &'a str, types: &'a [Type], ) -> Pin> + 'a + Send>> { - Box::pin(prepare(client, query, types)) + Box::pin(prepare(client, name, query, types)) } fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result { @@ -192,13 +173,8 @@ async fn typeinfo_statement(client: &Arc) -> Result stmt, - Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_TABLE) => { - prepare_rec(client, TYPEINFO_FALLBACK_QUERY, &[]).await? - } - Err(e) => return Err(e), - }; + let typeinfo = "neon_proxy_typeinfo"; + let stmt = prepare_rec(client, typeinfo, TYPEINFO_QUERY, &[]).await?; client.set_typeinfo(&stmt); Ok(stmt) @@ -219,13 +195,8 @@ async fn typeinfo_enum_statement(client: &Arc) -> Result stmt, - Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_COLUMN) => { - prepare_rec(client, TYPEINFO_ENUM_FALLBACK_QUERY, &[]).await? - } - Err(e) => return Err(e), - }; + let typeinfo = "neon_proxy_typeinfo_enum"; + let stmt = prepare_rec(client, typeinfo, TYPEINFO_ENUM_QUERY, &[]).await?; client.set_typeinfo_enum(&stmt); Ok(stmt) @@ -255,7 +226,8 @@ async fn typeinfo_composite_statement(client: &Arc) -> Result( - client: &InnerClient, - statement: Statement, - params: I, -) -> Result -where - I: IntoIterator, - I::IntoIter: ExactSizeIterator, -{ - let buf = if log_enabled!(Level::Debug) { - let params = params.into_iter().collect::>(); - debug!( - "executing statement {} with parameters: {:?}", - statement.name(), - BorrowToSqlParamsDebug(params.as_slice()), - ); - encode(client, &statement, params)? - } else { - encode(client, &statement, params)? - }; - let mut responses = start(client, buf).await?; - - let mut rows = 0; - loop { - match responses.next().await? { - Message::DataRow(_) => {} - Message::CommandComplete(body) => { - rows = body - .tag() - .map_err(Error::parse)? - .rsplit(' ') - .next() - .unwrap() - .parse() - .unwrap_or(0); - } - Message::EmptyQueryResponse => rows = 0, - Message::ReadyForQuery(_) => return Ok(rows), - _ => return Err(Error::unexpected_message()), - } - } -} - async fn start(client: &InnerClient, buf: Bytes) -> Result { let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?; diff --git a/libs/proxy/tokio-postgres2/src/statement.rs b/libs/proxy/tokio-postgres2/src/statement.rs index 22e160fc05..591872fbc5 100644 --- a/libs/proxy/tokio-postgres2/src/statement.rs +++ b/libs/proxy/tokio-postgres2/src/statement.rs @@ -13,7 +13,7 @@ use std::{ struct StatementInner { client: Weak, - name: String, + name: &'static str, params: Vec, columns: Vec, } @@ -22,7 +22,7 @@ impl Drop for StatementInner { fn drop(&mut self) { if let Some(client) = self.client.upgrade() { let buf = client.with_buf(|buf| { - frontend::close(b'S', &self.name, buf).unwrap(); + frontend::close(b'S', self.name, buf).unwrap(); frontend::sync(buf); buf.split().freeze() }); @@ -40,7 +40,7 @@ pub struct Statement(Arc); impl Statement { pub(crate) fn new( inner: &Arc, - name: String, + name: &'static str, params: Vec, columns: Vec, ) -> Statement { @@ -55,14 +55,14 @@ impl Statement { pub(crate) fn new_anonymous(params: Vec, columns: Vec) -> Statement { Statement(Arc::new(StatementInner { client: Weak::new(), - name: String::new(), + name: "", params, columns, })) } pub(crate) fn name(&self) -> &str { - &self.0.name + self.0.name } /// Returns the expected types of the statement's parameters. diff --git a/libs/proxy/tokio-postgres2/src/to_statement.rs b/libs/proxy/tokio-postgres2/src/to_statement.rs deleted file mode 100644 index 7e12992728..0000000000 --- a/libs/proxy/tokio-postgres2/src/to_statement.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::to_statement::private::{Sealed, ToStatementType}; -use crate::Statement; - -mod private { - use crate::{Client, Error, Statement}; - - pub trait Sealed {} - - pub enum ToStatementType<'a> { - Statement(&'a Statement), - Query(&'a str), - } - - impl ToStatementType<'_> { - pub async fn into_statement(self, client: &Client) -> Result { - match self { - ToStatementType::Statement(s) => Ok(s.clone()), - ToStatementType::Query(s) => client.prepare(s).await, - } - } - } -} - -/// A trait abstracting over prepared and unprepared statements. -/// -/// Many methods are generic over this bound, so that they support both a raw query string as well as a statement which -/// was prepared previously. -/// -/// This trait is "sealed" and cannot be implemented by anything outside this crate. -pub trait ToStatement: Sealed { - #[doc(hidden)] - fn __convert(&self) -> ToStatementType<'_>; -} - -impl ToStatement for Statement { - fn __convert(&self) -> ToStatementType<'_> { - ToStatementType::Statement(self) - } -} - -impl Sealed for Statement {} - -impl ToStatement for str { - fn __convert(&self) -> ToStatementType<'_> { - ToStatementType::Query(self) - } -} - -impl Sealed for str {} - -impl ToStatement for String { - fn __convert(&self) -> ToStatementType<'_> { - ToStatementType::Query(self) - } -} - -impl Sealed for String {} diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 6a59d413c4..f35c375ba2 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -372,7 +372,7 @@ impl PoolingBackend { debug!("setting up backend session state"); // initiates the auth session - if let Err(e) = client.execute("select auth.init()", &[]).await { + if let Err(e) = client.batch_execute("select auth.init();").await { discard.discard(); return Err(e.into()); } diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index fe33f0ff65..7ed514ff65 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -23,7 +23,6 @@ use indexmap::IndexMap; use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding}; use parking_lot::RwLock; use postgres_client::tls::NoTlsStream; -use postgres_client::types::ToSql; use postgres_client::AsyncMessage; use serde_json::value::RawValue; use tokio::net::TcpStream; @@ -281,13 +280,9 @@ impl ClientInnerCommon { let token = resign_jwt(&local_data.key, payload, local_data.jti)?; // initiates the auth session - self.inner.batch_execute("discard all").await?; - self.inner - .execute( - "select auth.jwt_session_init($1)", - &[&&*token as &(dyn ToSql + Sync)], - ) - .await?; + // this is safe from query injections as the jwt format free of any escape characters. + let query = format!("discard all; select auth.jwt_session_init('{token}')"); + self.inner.batch_execute(&query).await?; let pid = self.inner.get_process_id(); info!(pid, jti = local_data.jti, "user session state init");