feat: update pgwire to 0.20 for improved performance (#3538)

This commit is contained in:
Ning Sun
2024-03-19 03:11:05 -07:00
committed by GitHub
parent 2c115bc22a
commit a99d6eb3f9
3 changed files with 51 additions and 32 deletions

12
Cargo.lock generated
View File

@@ -863,6 +863,12 @@ version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9"
[[package]]
name = "base64"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51"
[[package]]
name = "base64ct"
version = "1.6.0"
@@ -6697,12 +6703,12 @@ dependencies = [
[[package]]
name = "pgwire"
version = "0.19.1"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17780c93587822c191c3f4d43fa5f6bc6df1e51b9f58a0be0cd1b7fd6e80d9e6"
checksum = "c00492c52bb65e0421211b7f4c5d9de7586e53786a3b244efb00f74851206bf6"
dependencies = [
"async-trait",
"base64 0.21.5",
"base64 0.22.0",
"bytes",
"chrono",
"derive-new 0.6.0",

View File

@@ -67,7 +67,7 @@ openmetrics-parser = "0.4"
opensrv-mysql = "0.7.0"
opentelemetry-proto.workspace = true
parking_lot = "0.12"
pgwire = "0.19.1"
pgwire = "0.20"
pin-project = "1.0"
postgres-types = { version = "0.2", features = ["with-chrono-0_4"] }
pprof = { version = "0.13", features = [

View File

@@ -23,9 +23,11 @@ use common_telemetry::tracing;
use datatypes::schema::SchemaRef;
use futures::{future, stream, Stream, StreamExt};
use pgwire::api::portal::{Format, Portal};
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler, StatementOrPortal};
use pgwire::api::results::{DataRowEncoder, DescribeResponse, QueryResponse, Response, Tag};
use pgwire::api::stmt::QueryParser;
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
use pgwire::api::results::{
DataRowEncoder, DescribePortalResponse, DescribeStatementResponse, QueryResponse, Response, Tag,
};
use pgwire::api::stmt::{QueryParser, StoredStatement};
use pgwire::api::{ClientInfo, Type};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use query::query_engine::DescribeResult;
@@ -238,44 +240,55 @@ impl ExtendedQueryHandler for PostgresServerHandler {
output_to_query_response(output, &portal.result_column_format)
}
async fn do_describe<C>(
async fn do_describe_statement<C>(
&self,
_client: &mut C,
target: StatementOrPortal<'_, Self::Statement>,
) -> PgWireResult<DescribeResponse>
stmt: &StoredStatement<Self::Statement>,
) -> PgWireResult<DescribeStatementResponse>
where
C: ClientInfo + Unpin + Send + Sync,
{
let (param_types, sql_plan, format) = match target {
StatementOrPortal::Statement(stmt) => {
let sql_plan = &stmt.statement;
if let Some(plan) = &sql_plan.plan {
let param_types = plan
.get_param_types()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let sql_plan = &stmt.statement;
let (param_types, sql_plan, format) = if let Some(plan) = &sql_plan.plan {
let param_types = plan
.get_param_types()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let types = param_types_to_pg_types(&param_types)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let types = param_types_to_pg_types(&param_types)
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
(Some(types), sql_plan, &Format::UnifiedBinary)
} else {
let param_types = Some(stmt.parameter_types.clone());
(param_types, sql_plan, &Format::UnifiedBinary)
}
}
StatementOrPortal::Portal(portal) => (
None,
&portal.statement.statement,
&portal.result_column_format,
),
(types, sql_plan, &Format::UnifiedBinary)
} else {
let param_types = stmt.parameter_types.clone();
(param_types, sql_plan, &Format::UnifiedBinary)
};
if let Some(schema) = &sql_plan.schema {
schema_to_pg(schema, format)
.map(|fields| DescribeResponse::new(param_types, fields))
.map(|fields| DescribeStatementResponse::new(param_types, fields))
.map_err(|e| PgWireError::ApiError(Box::new(e)))
} else {
Ok(DescribeResponse::new(param_types, vec![]))
Ok(DescribeStatementResponse::new(param_types, vec![]))
}
}
async fn do_describe_portal<C>(
&self,
_client: &mut C,
portal: &Portal<Self::Statement>,
) -> PgWireResult<DescribePortalResponse>
where
C: ClientInfo + Unpin + Send + Sync,
{
let sql_plan = &portal.statement.statement;
let format = &portal.result_column_format;
if let Some(schema) = &sql_plan.schema {
schema_to_pg(schema, format)
.map(DescribePortalResponse::new)
.map_err(|e| PgWireError::ApiError(Box::new(e)))
} else {
Ok(DescribePortalResponse::new(vec![]))
}
}
}