fix: regression with shortcutted statement on postgres extended query (#7340)

* fix: regression with shortcutted statement on postgres extended query

* chore: typo fix

* feat: also add more type support for parameters

* chore: remove dbg print
This commit is contained in:
Ning Sun
2025-12-05 10:08:23 +08:00
committed by GitHub
parent 84e4e42ee7
commit 2147545c90
4 changed files with 213 additions and 5 deletions

4
Cargo.lock generated
View File

@@ -9203,9 +9203,9 @@ dependencies = [
[[package]] [[package]]
name = "pgwire" name = "pgwire"
version = "0.36.1" version = "0.36.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d331bb0eef5bc83a221c0a85b1f205bccf094d4f72a26ae1d68a1b1c535123b7" checksum = "70a2bcdcc4b20a88e0648778ecf00415bbd5b447742275439c22176835056f99"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"base64 0.22.1", "base64 0.22.1",

View File

@@ -87,7 +87,7 @@ operator.workspace = true
otel-arrow-rust.workspace = true otel-arrow-rust.workspace = true
parking_lot.workspace = true parking_lot.workspace = true
pg_interval = "0.4" pg_interval = "0.4"
pgwire = { version = "0.36.1", default-features = false, features = [ pgwire = { version = "0.36.3", default-features = false, features = [
"server-api-ring", "server-api-ring",
"pg-ext-types", "pg-ext-types",
] } ] }

View File

@@ -461,8 +461,18 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {
// we will not support other show statements for extended query protocol at least for now. // we will not support other show statements for extended query protocol at least for now.
// because the return columns is not predictable at this stage // because the return columns is not predictable at this stage
_ => { _ => {
// fallback to NoData // test if query caught by fixture
Ok(DescribePortalResponse::new(vec![])) if let Some(mut resp) =
fixtures::process(&sql_plan.query, self.session.new_query_context())
&& let Response::Query(query_response) = resp.remove(0)
{
Ok(DescribePortalResponse::new(
(*query_response.row_schema()).clone(),
))
} else {
// fallback to NoData
Ok(DescribePortalResponse::new(vec![]))
}
} }
} }
} }

View File

@@ -1260,6 +1260,204 @@ pub(super) fn parameters_to_scalar_values(
ScalarValue::Null ScalarValue::Null
} }
} }
&Type::TIMESTAMP_ARRAY => {
let data = portal.parameter::<Vec<NaiveDateTime>>(idx, &client_type)?;
if let Some(data) = data {
if let Some(ConcreteDataType::List(list_type)) = &server_type {
match list_type.item_type() {
ConcreteDataType::Timestamp(unit) => match *unit {
TimestampType::Second(_) => {
let values = data
.into_iter()
.map(|ts| {
ScalarValue::TimestampSecond(
Some(ts.and_utc().timestamp()),
None,
)
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Second, None),
true,
))
}
TimestampType::Millisecond(_) => {
let values = data
.into_iter()
.map(|ts| {
ScalarValue::TimestampMillisecond(
Some(ts.and_utc().timestamp_millis()),
None,
)
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
true,
))
}
TimestampType::Microsecond(_) => {
let values = data
.into_iter()
.map(|ts| {
ScalarValue::TimestampMicrosecond(
Some(ts.and_utc().timestamp_micros()),
None,
)
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
true,
))
}
TimestampType::Nanosecond(_) => {
let values = data
.into_iter()
.filter_map(|ts| {
ts.and_utc().timestamp_nanos_opt().map(|nanos| {
ScalarValue::TimestampNanosecond(Some(nanos), None)
})
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
true,
))
}
},
_ => {
return Err(invalid_parameter_error(
"invalid_parameter_type",
Some(format!(
"Expected: {}, found: {}",
list_type.item_type(),
client_type
)),
));
}
}
} else {
// Default to millisecond when no server type is specified
let values = data
.into_iter()
.map(|ts| {
ScalarValue::TimestampMillisecond(
Some(ts.and_utc().timestamp_millis()),
None,
)
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
true,
))
}
} else {
ScalarValue::Null
}
}
&Type::TIMESTAMPTZ_ARRAY => {
let data = portal.parameter::<Vec<DateTime<FixedOffset>>>(idx, &client_type)?;
if let Some(data) = data {
if let Some(ConcreteDataType::List(list_type)) = &server_type {
match list_type.item_type() {
ConcreteDataType::Timestamp(unit) => match *unit {
TimestampType::Second(_) => {
let values = data
.into_iter()
.map(|ts| {
ScalarValue::TimestampSecond(Some(ts.timestamp()), None)
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Second, None),
true,
))
}
TimestampType::Millisecond(_) => {
let values = data
.into_iter()
.map(|ts| {
ScalarValue::TimestampMillisecond(
Some(ts.timestamp_millis()),
None,
)
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
true,
))
}
TimestampType::Microsecond(_) => {
let values = data
.into_iter()
.map(|ts| {
ScalarValue::TimestampMicrosecond(
Some(ts.timestamp_micros()),
None,
)
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
true,
))
}
TimestampType::Nanosecond(_) => {
let values = data
.into_iter()
.filter_map(|ts| {
ts.timestamp_nanos_opt().map(|nanos| {
ScalarValue::TimestampNanosecond(Some(nanos), None)
})
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
true,
))
}
},
_ => {
return Err(invalid_parameter_error(
"invalid_parameter_type",
Some(format!(
"Expected: {}, found: {}",
list_type.item_type(),
client_type
)),
));
}
}
} else {
// Default to millisecond when no server type is specified
let values = data
.into_iter()
.map(|ts| {
ScalarValue::TimestampMillisecond(Some(ts.timestamp_millis()), None)
})
.collect::<Vec<_>>();
ScalarValue::List(ScalarValue::new_list(
&values,
&ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
true,
))
}
} else {
ScalarValue::Null
}
}
_ => Err(invalid_parameter_error( _ => Err(invalid_parameter_error(
"unsupported_parameter_value", "unsupported_parameter_value",
Some(format!("Found type: {}", client_type)), Some(format!("Found type: {}", client_type)),