From a99d6eb3f97e6235d85f15ea4957cd4d6f88a6de Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Tue, 19 Mar 2024 03:11:05 -0700 Subject: [PATCH] feat: update pgwire to 0.20 for improved performance (#3538) --- Cargo.lock | 12 +++-- src/servers/Cargo.toml | 2 +- src/servers/src/postgres/handler.rs | 69 +++++++++++++++++------------ 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c21ef54f62..385ae145c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -863,6 +863,12 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "base64ct" version = "1.6.0" @@ -6697,12 +6703,12 @@ dependencies = [ [[package]] name = "pgwire" -version = "0.19.1" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17780c93587822c191c3f4d43fa5f6bc6df1e51b9f58a0be0cd1b7fd6e80d9e6" +checksum = "c00492c52bb65e0421211b7f4c5d9de7586e53786a3b244efb00f74851206bf6" dependencies = [ "async-trait", - "base64 0.21.5", + "base64 0.22.0", "bytes", "chrono", "derive-new 0.6.0", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 95cbea167f..221ac3aa82 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -67,7 +67,7 @@ openmetrics-parser = "0.4" opensrv-mysql = "0.7.0" opentelemetry-proto.workspace = true parking_lot = "0.12" -pgwire = "0.19.1" +pgwire = "0.20" pin-project = "1.0" postgres-types = { version = "0.2", features = ["with-chrono-0_4"] } pprof = { version = "0.13", features = [ diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index 352d292758..711c2b30db 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -23,9 +23,11 @@ use common_telemetry::tracing; use datatypes::schema::SchemaRef; use futures::{future, stream, Stream, StreamExt}; use pgwire::api::portal::{Format, Portal}; -use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler, StatementOrPortal}; -use pgwire::api::results::{DataRowEncoder, DescribeResponse, QueryResponse, Response, Tag}; -use pgwire::api::stmt::QueryParser; +use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler}; +use pgwire::api::results::{ + DataRowEncoder, DescribePortalResponse, DescribeStatementResponse, QueryResponse, Response, Tag, +}; +use pgwire::api::stmt::{QueryParser, StoredStatement}; use pgwire::api::{ClientInfo, Type}; use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use query::query_engine::DescribeResult; @@ -238,44 +240,55 @@ impl ExtendedQueryHandler for PostgresServerHandler { output_to_query_response(output, &portal.result_column_format) } - async fn do_describe( + async fn do_describe_statement( &self, _client: &mut C, - target: StatementOrPortal<'_, Self::Statement>, - ) -> PgWireResult + stmt: &StoredStatement, + ) -> PgWireResult where C: ClientInfo + Unpin + Send + Sync, { - let (param_types, sql_plan, format) = match target { - StatementOrPortal::Statement(stmt) => { - let sql_plan = &stmt.statement; - if let Some(plan) = &sql_plan.plan { - let param_types = plan - .get_param_types() - .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + let sql_plan = &stmt.statement; + let (param_types, sql_plan, format) = if let Some(plan) = &sql_plan.plan { + let param_types = plan + .get_param_types() + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; - let types = param_types_to_pg_types(¶m_types) - .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + let types = param_types_to_pg_types(¶m_types) + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; - (Some(types), sql_plan, &Format::UnifiedBinary) - } else { - let param_types = Some(stmt.parameter_types.clone()); - (param_types, sql_plan, &Format::UnifiedBinary) - } - } - StatementOrPortal::Portal(portal) => ( - None, - &portal.statement.statement, - &portal.result_column_format, - ), + (types, sql_plan, &Format::UnifiedBinary) + } else { + let param_types = stmt.parameter_types.clone(); + (param_types, sql_plan, &Format::UnifiedBinary) }; if let Some(schema) = &sql_plan.schema { schema_to_pg(schema, format) - .map(|fields| DescribeResponse::new(param_types, fields)) + .map(|fields| DescribeStatementResponse::new(param_types, fields)) .map_err(|e| PgWireError::ApiError(Box::new(e))) } else { - Ok(DescribeResponse::new(param_types, vec![])) + Ok(DescribeStatementResponse::new(param_types, vec![])) + } + } + + async fn do_describe_portal( + &self, + _client: &mut C, + portal: &Portal, + ) -> PgWireResult + where + C: ClientInfo + Unpin + Send + Sync, + { + let sql_plan = &portal.statement.statement; + let format = &portal.result_column_format; + + if let Some(schema) = &sql_plan.schema { + schema_to_pg(schema, format) + .map(DescribePortalResponse::new) + .map_err(|e| PgWireError::ApiError(Box::new(e))) + } else { + Ok(DescribePortalResponse::new(vec![])) } } }