mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 08:29:59 +00:00
feat: list/array/timezone support for postgres output (#4727)
* feat: list/array support for postgres output * fix: implement time zone support for postgrsql * feat: add a geohash function that returns array * fix: typo * fix: lint warnings * test: add sqlness test * refactor: check resolution range before convert value * fix: test result for sqlness * feat: upgrade pgwire apis
This commit is contained in:
42
Cargo.lock
generated
42
Cargo.lock
generated
@@ -3303,9 +3303,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "derive-new"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d150dea618e920167e5973d70ae6ece4385b7164e0d799fe7c122dd0a5d912ad"
|
||||
checksum = "2cdc8d50f426189eef89dac62fabfa0abb27d5cc008f25bf4156a0203325becc"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -5760,6 +5760,29 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy-regex"
|
||||
version = "3.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8d8e41c97e6bc7ecb552016274b99fbb5d035e8de288c582d9b933af6677bfda"
|
||||
dependencies = [
|
||||
"lazy-regex-proc_macros",
|
||||
"once_cell",
|
||||
"regex-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy-regex-proc_macros"
|
||||
version = "3.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "76e1d8b05d672c53cb9c7b920bbba8783845ae4f0b076e02a3db1d02c81b4163"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"syn 2.0.66",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.4.0"
|
||||
@@ -7856,20 +7879,22 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pgwire"
|
||||
version = "0.22.0"
|
||||
version = "0.24.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3770f56e1e8a608c6de40011b9a00c6b669c14d121024411701b4bc3b2a5be99"
|
||||
checksum = "ed4ca46dd335b3a030d977be54dfe121b1b9fe22aa8bbd69161ac2434524fc68"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"derive-new 0.6.0",
|
||||
"derive-new 0.7.0",
|
||||
"futures",
|
||||
"hex",
|
||||
"lazy-regex",
|
||||
"md5",
|
||||
"postgres-types",
|
||||
"rand",
|
||||
"ring 0.17.8",
|
||||
"rust_decimal",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.0",
|
||||
@@ -9075,6 +9100,12 @@ dependencies = [
|
||||
"regex-syntax 0.8.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex-lite"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.6.29"
|
||||
@@ -9550,6 +9581,7 @@ dependencies = [
|
||||
"borsh",
|
||||
"bytes",
|
||||
"num-traits",
|
||||
"postgres-types",
|
||||
"rand",
|
||||
"rkyv",
|
||||
"serde",
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::sync::Arc;
|
||||
mod geohash;
|
||||
mod h3;
|
||||
|
||||
use geohash::GeohashFunction;
|
||||
use geohash::{GeohashFunction, GeohashNeighboursFunction};
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
|
||||
@@ -26,6 +26,7 @@ impl GeoFunctions {
|
||||
pub fn register(registry: &FunctionRegistry) {
|
||||
// geohash
|
||||
registry.register(Arc::new(GeohashFunction));
|
||||
registry.register(Arc::new(GeohashNeighboursFunction));
|
||||
// h3 family
|
||||
registry.register(Arc::new(h3::H3LatLngToCell));
|
||||
registry.register(Arc::new(h3::H3LatLngToCellString));
|
||||
|
||||
@@ -20,23 +20,69 @@ use common_query::error::{self, InvalidFuncArgsSnafu, Result};
|
||||
use common_query::prelude::{Signature, TypeSignature};
|
||||
use datafusion::logical_expr::Volatility;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, StringVectorBuilder, VectorRef};
|
||||
use datatypes::scalars::{Scalar, ScalarVectorBuilder};
|
||||
use datatypes::value::{ListValue, Value};
|
||||
use datatypes::vectors::{ListVectorBuilder, MutableVector, StringVectorBuilder, VectorRef};
|
||||
use geohash::Coord;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::function::{Function, FunctionContext};
|
||||
|
||||
macro_rules! ensure_resolution_usize {
|
||||
($v: ident) => {
|
||||
if !($v > 0 && $v <= 12) {
|
||||
Err(BoxedError::new(PlainError::new(
|
||||
format!("Invalid geohash resolution {}, expect value: [1, 12]", $v),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
)))
|
||||
.context(error::ExecuteSnafu)
|
||||
} else {
|
||||
Ok($v as usize)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn try_into_resolution(v: Value) -> Result<usize> {
|
||||
match v {
|
||||
Value::Int8(v) => {
|
||||
ensure_resolution_usize!(v)
|
||||
}
|
||||
Value::Int16(v) => {
|
||||
ensure_resolution_usize!(v)
|
||||
}
|
||||
Value::Int32(v) => {
|
||||
ensure_resolution_usize!(v)
|
||||
}
|
||||
Value::Int64(v) => {
|
||||
ensure_resolution_usize!(v)
|
||||
}
|
||||
Value::UInt8(v) => {
|
||||
ensure_resolution_usize!(v)
|
||||
}
|
||||
Value::UInt16(v) => {
|
||||
ensure_resolution_usize!(v)
|
||||
}
|
||||
Value::UInt32(v) => {
|
||||
ensure_resolution_usize!(v)
|
||||
}
|
||||
Value::UInt64(v) => {
|
||||
ensure_resolution_usize!(v)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Function that return geohash string for a given geospatial coordinate.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct GeohashFunction;
|
||||
|
||||
const NAME: &str = "geohash";
|
||||
impl GeohashFunction {
|
||||
const NAME: &'static str = "geohash";
|
||||
}
|
||||
|
||||
impl Function for GeohashFunction {
|
||||
fn name(&self) -> &str {
|
||||
NAME
|
||||
Self::NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
@@ -93,17 +139,7 @@ impl Function for GeohashFunction {
|
||||
for i in 0..size {
|
||||
let lat = lat_vec.get(i).as_f64_lossy();
|
||||
let lon = lon_vec.get(i).as_f64_lossy();
|
||||
let r = match resolution_vec.get(i) {
|
||||
Value::Int8(v) => v as usize,
|
||||
Value::Int16(v) => v as usize,
|
||||
Value::Int32(v) => v as usize,
|
||||
Value::Int64(v) => v as usize,
|
||||
Value::UInt8(v) => v as usize,
|
||||
Value::UInt16(v) => v as usize,
|
||||
Value::UInt32(v) => v as usize,
|
||||
Value::UInt64(v) => v as usize,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let r = try_into_resolution(resolution_vec.get(i))?;
|
||||
|
||||
let result = match (lat, lon) {
|
||||
(Some(lat), Some(lon)) => {
|
||||
@@ -130,6 +166,134 @@ impl Function for GeohashFunction {
|
||||
|
||||
impl fmt::Display for GeohashFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", NAME)
|
||||
write!(f, "{}", Self::NAME)
|
||||
}
|
||||
}
|
||||
|
||||
/// Function that return geohash string for a given geospatial coordinate.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct GeohashNeighboursFunction;
|
||||
|
||||
impl GeohashNeighboursFunction {
|
||||
const NAME: &'static str = "geohash_neighbours";
|
||||
}
|
||||
|
||||
impl Function for GeohashNeighboursFunction {
|
||||
fn name(&self) -> &str {
|
||||
GeohashNeighboursFunction::NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
|
||||
Ok(ConcreteDataType::list_datatype(
|
||||
ConcreteDataType::string_datatype(),
|
||||
))
|
||||
}
|
||||
|
||||
fn signature(&self) -> Signature {
|
||||
let mut signatures = Vec::new();
|
||||
for coord_type in &[
|
||||
ConcreteDataType::float32_datatype(),
|
||||
ConcreteDataType::float64_datatype(),
|
||||
] {
|
||||
for resolution_type in &[
|
||||
ConcreteDataType::int8_datatype(),
|
||||
ConcreteDataType::int16_datatype(),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
ConcreteDataType::int64_datatype(),
|
||||
ConcreteDataType::uint8_datatype(),
|
||||
ConcreteDataType::uint16_datatype(),
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
] {
|
||||
signatures.push(TypeSignature::Exact(vec![
|
||||
// latitude
|
||||
coord_type.clone(),
|
||||
// longitude
|
||||
coord_type.clone(),
|
||||
// resolution
|
||||
resolution_type.clone(),
|
||||
]));
|
||||
}
|
||||
}
|
||||
Signature::one_of(signatures, Volatility::Stable)
|
||||
}
|
||||
|
||||
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
|
||||
ensure!(
|
||||
columns.len() == 3,
|
||||
InvalidFuncArgsSnafu {
|
||||
err_msg: format!(
|
||||
"The length of the args is not correct, expect 3, provided : {}",
|
||||
columns.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
let lat_vec = &columns[0];
|
||||
let lon_vec = &columns[1];
|
||||
let resolution_vec = &columns[2];
|
||||
|
||||
let size = lat_vec.len();
|
||||
let mut results =
|
||||
ListVectorBuilder::with_type_capacity(ConcreteDataType::string_datatype(), size);
|
||||
|
||||
for i in 0..size {
|
||||
let lat = lat_vec.get(i).as_f64_lossy();
|
||||
let lon = lon_vec.get(i).as_f64_lossy();
|
||||
let r = try_into_resolution(resolution_vec.get(i))?;
|
||||
|
||||
let result = match (lat, lon) {
|
||||
(Some(lat), Some(lon)) => {
|
||||
let coord = Coord { x: lon, y: lat };
|
||||
let encoded = geohash::encode(coord, r)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(PlainError::new(
|
||||
format!("Geohash error: {}", e),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
))
|
||||
})
|
||||
.context(error::ExecuteSnafu)?;
|
||||
let neighbours = geohash::neighbors(&encoded)
|
||||
.map_err(|e| {
|
||||
BoxedError::new(PlainError::new(
|
||||
format!("Geohash error: {}", e),
|
||||
StatusCode::EngineExecuteQuery,
|
||||
))
|
||||
})
|
||||
.context(error::ExecuteSnafu)?;
|
||||
Some(ListValue::new(
|
||||
vec![
|
||||
neighbours.n,
|
||||
neighbours.nw,
|
||||
neighbours.w,
|
||||
neighbours.sw,
|
||||
neighbours.s,
|
||||
neighbours.se,
|
||||
neighbours.e,
|
||||
neighbours.ne,
|
||||
]
|
||||
.into_iter()
|
||||
.map(Value::from)
|
||||
.collect(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(list_value) = result {
|
||||
results.push(Some(list_value.as_scalar_ref()));
|
||||
} else {
|
||||
results.push(None);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for GeohashNeighboursFunction {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", GeohashNeighboursFunction::NAME)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ openmetrics-parser = "0.4"
|
||||
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "6bbc3b65e6b19212c4f7fc4f40c20daf6f452deb" }
|
||||
opentelemetry-proto.workspace = true
|
||||
parking_lot = "0.12"
|
||||
pgwire = { version = "0.22", default-features = false, features = ["server-api-ring"] }
|
||||
pgwire = { version = "0.24.2", default-features = false, features = ["server-api-ring"] }
|
||||
pin-project = "1.0"
|
||||
pipeline.workspace = true
|
||||
postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1"] }
|
||||
|
||||
@@ -32,7 +32,8 @@ use std::sync::Arc;
|
||||
use ::auth::UserProviderRef;
|
||||
use derive_builder::Builder;
|
||||
use pgwire::api::auth::ServerParameterProvider;
|
||||
use pgwire::api::ClientInfo;
|
||||
use pgwire::api::copy::NoopCopyHandler;
|
||||
use pgwire::api::{ClientInfo, PgWireHandlerFactory};
|
||||
pub use server::PostgresServer;
|
||||
use session::context::Channel;
|
||||
use session::Session;
|
||||
@@ -68,7 +69,7 @@ impl ServerParameterProvider for GreptimeDBStartupParameters {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostgresServerHandler {
|
||||
pub struct PostgresServerHandlerInner {
|
||||
query_handler: ServerSqlQueryHandlerRef,
|
||||
login_verifier: PgLoginVerifier,
|
||||
force_tls: bool,
|
||||
@@ -87,10 +88,35 @@ pub(crate) struct MakePostgresServerHandler {
|
||||
force_tls: bool,
|
||||
}
|
||||
|
||||
pub(crate) struct PostgresServerHandler(Arc<PostgresServerHandlerInner>);
|
||||
|
||||
impl PgWireHandlerFactory for PostgresServerHandler {
|
||||
type StartupHandler = PostgresServerHandlerInner;
|
||||
type SimpleQueryHandler = PostgresServerHandlerInner;
|
||||
type ExtendedQueryHandler = PostgresServerHandlerInner;
|
||||
type CopyHandler = NoopCopyHandler;
|
||||
|
||||
fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
|
||||
self.0.clone()
|
||||
}
|
||||
|
||||
fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
|
||||
self.0.clone()
|
||||
}
|
||||
|
||||
fn startup_handler(&self) -> Arc<Self::StartupHandler> {
|
||||
self.0.clone()
|
||||
}
|
||||
|
||||
fn copy_handler(&self) -> Arc<Self::CopyHandler> {
|
||||
Arc::new(NoopCopyHandler)
|
||||
}
|
||||
}
|
||||
|
||||
impl MakePostgresServerHandler {
|
||||
fn make(&self, addr: Option<SocketAddr>) -> PostgresServerHandler {
|
||||
let session = Arc::new(Session::new(addr, Channel::Postgres, Default::default()));
|
||||
PostgresServerHandler {
|
||||
let handler = PostgresServerHandlerInner {
|
||||
query_handler: self.query_handler.clone(),
|
||||
login_verifier: PgLoginVerifier::new(self.user_provider.clone()),
|
||||
force_tls: self.force_tls,
|
||||
@@ -98,6 +124,7 @@ impl MakePostgresServerHandler {
|
||||
|
||||
session: session.clone(),
|
||||
query_parser: Arc::new(DefaultQueryParser::new(self.query_handler.clone(), session)),
|
||||
}
|
||||
};
|
||||
PostgresServerHandler(Arc::new(handler))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ use pgwire::messages::{PgWireBackendMessage, PgWireFrontendMessage};
|
||||
use session::Session;
|
||||
use snafu::IntoError;
|
||||
|
||||
use super::PostgresServerHandler;
|
||||
use super::PostgresServerHandlerInner;
|
||||
use crate::error::{AuthSnafu, Result};
|
||||
use crate::metrics::METRIC_AUTH_FAILURE;
|
||||
use crate::postgres::types::PgErrorCode;
|
||||
@@ -127,7 +127,7 @@ where
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StartupHandler for PostgresServerHandler {
|
||||
impl StartupHandler for PostgresServerHandlerInner {
|
||||
async fn on_startup<C>(
|
||||
&self,
|
||||
client: &mut C,
|
||||
|
||||
@@ -39,13 +39,13 @@ use sql::dialect::PostgreSqlDialect;
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
|
||||
use super::types::*;
|
||||
use super::{fixtures, PostgresServerHandler};
|
||||
use super::{fixtures, PostgresServerHandlerInner};
|
||||
use crate::error::Result;
|
||||
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
|
||||
use crate::SqlPlan;
|
||||
|
||||
#[async_trait]
|
||||
impl SimpleQueryHandler for PostgresServerHandler {
|
||||
impl SimpleQueryHandler for PostgresServerHandlerInner {
|
||||
#[tracing::instrument(skip_all, fields(protocol = "postgres"))]
|
||||
async fn do_query<'a, C>(
|
||||
&self,
|
||||
@@ -237,7 +237,7 @@ impl QueryParser for DefaultQueryParser {
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExtendedQueryHandler for PostgresServerHandler {
|
||||
impl ExtendedQueryHandler for PostgresServerHandlerInner {
|
||||
type Statement = SqlPlan;
|
||||
type QueryParser = DefaultQueryParser;
|
||||
|
||||
|
||||
@@ -94,14 +94,8 @@ impl PostgresServer {
|
||||
let _handle = io_runtime.spawn(async move {
|
||||
crate::metrics::METRIC_POSTGRES_CONNECTIONS.inc();
|
||||
let pg_handler = Arc::new(handler_maker.make(addr));
|
||||
let r = process_socket(
|
||||
io_stream,
|
||||
tls_acceptor.clone(),
|
||||
pg_handler.clone(),
|
||||
pg_handler.clone(),
|
||||
pg_handler,
|
||||
)
|
||||
.await;
|
||||
let r =
|
||||
process_socket(io_stream, tls_acceptor.clone(), pg_handler).await;
|
||||
crate::metrics::METRIC_POSTGRES_CONNECTIONS.dec();
|
||||
r
|
||||
});
|
||||
|
||||
@@ -20,13 +20,14 @@ mod interval;
|
||||
use std::collections::HashMap;
|
||||
use std::ops::Deref;
|
||||
|
||||
use chrono::{NaiveDate, NaiveDateTime};
|
||||
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
|
||||
use common_time::Interval;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datatypes::prelude::{ConcreteDataType, Value};
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::value::ListValue;
|
||||
use pgwire::api::portal::{Format, Portal};
|
||||
use pgwire::api::results::{DataRowEncoder, FieldInfo};
|
||||
use pgwire::api::Type;
|
||||
@@ -58,6 +59,317 @@ pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Ve
|
||||
.collect::<Result<Vec<FieldInfo>>>()
|
||||
}
|
||||
|
||||
fn encode_array(
|
||||
query_ctx: &QueryContextRef,
|
||||
value_list: &ListValue,
|
||||
builder: &mut DataRowEncoder,
|
||||
) -> PgWireResult<()> {
|
||||
match value_list.datatype() {
|
||||
&ConcreteDataType::Boolean(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Boolean(v) => Ok(Some(*v)),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<bool>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Int8(v) => Ok(Some(*v)),
|
||||
Value::UInt8(v) => Ok(Some(*v as i8)),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"Invalid list item type, find {v:?}, expected int8 or uint8",
|
||||
),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<i8>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Int16(v) => Ok(Some(*v)),
|
||||
Value::UInt16(v) => Ok(Some(*v as i16)),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"Invalid list item type, find {v:?}, expected int16 or uint16",
|
||||
),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<i16>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Int32(v) => Ok(Some(*v)),
|
||||
Value::UInt32(v) => Ok(Some(*v as i32)),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"Invalid list item type, find {v:?}, expected int32 or uint32",
|
||||
),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<i32>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Int64(v) => Ok(Some(*v)),
|
||||
Value::UInt64(v) => Ok(Some(*v as i64)),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"Invalid list item type, find {v:?}, expected int64 or uint64",
|
||||
),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<i64>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Float32(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Float32(v) => Ok(Some(v.0)),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<f32>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Float64(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Float64(v) => Ok(Some(v.0)),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<f64>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Binary(_) => {
|
||||
let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
|
||||
|
||||
match *bytea_output {
|
||||
PGByteaOutputValue::ESCAPE => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
|
||||
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"Invalid list item type, find {v:?}, expected binary",
|
||||
),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
PGByteaOutputValue::HEX => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
|
||||
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"Invalid list item type, find {v:?}, expected binary",
|
||||
),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
}
|
||||
}
|
||||
&ConcreteDataType::String(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::String(v) => Ok(Some(v.as_utf8())),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected string",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<&str>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Date(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Date(v) => {
|
||||
if let Some(date) = v.to_chrono_date() {
|
||||
let (style, order) =
|
||||
*query_ctx.configuration_parameter().pg_datetime_style();
|
||||
Ok(Some(StylingDate(date, style, order)))
|
||||
} else {
|
||||
Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Failed to convert date to postgres type {v:?}",),
|
||||
})))
|
||||
}
|
||||
}
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected date",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::DateTime(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::DateTime(v) => {
|
||||
if let Some(datetime) =
|
||||
v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
|
||||
{
|
||||
let (style, order) =
|
||||
*query_ctx.configuration_parameter().pg_datetime_style();
|
||||
Ok(Some(StylingDateTime(datetime, style, order)))
|
||||
} else {
|
||||
Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Failed to convert date to postgres type {v:?}",),
|
||||
})))
|
||||
}
|
||||
}
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected date",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Timestamp(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Timestamp(v) => {
|
||||
if let Some(datetime) =
|
||||
v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
|
||||
{
|
||||
let (style, order) =
|
||||
*query_ctx.configuration_parameter().pg_datetime_style();
|
||||
Ok(Some(StylingDateTime(datetime, style, order)))
|
||||
} else {
|
||||
Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Failed to convert date to postgres type {v:?}",),
|
||||
})))
|
||||
}
|
||||
}
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Time(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Time(v) => Ok(v.to_chrono_time()),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected time",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Interval(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Interval(v) => Ok(Some(PgInterval::from(*v))),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Decimal128(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Decimal128(v) => Ok(Some(v.to_string())),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<String>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
&ConcreteDataType::Json(_) => {
|
||||
let array = value_list
|
||||
.items()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
Value::Null => Ok(None),
|
||||
Value::Binary(v) => Ok(Some(jsonb::to_string(v))),
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Invalid list item type, find {v:?}, expected json",),
|
||||
}))),
|
||||
})
|
||||
.collect::<PgWireResult<Vec<Option<String>>>>()?;
|
||||
builder.encode_field(&array)
|
||||
}
|
||||
_ => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"cannot write array type {:?} in postgres protocol: unimplemented",
|
||||
value_list.datatype()
|
||||
),
|
||||
}))),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn encode_value(
|
||||
query_ctx: &QueryContextRef,
|
||||
value: &Value,
|
||||
@@ -93,7 +405,7 @@ pub(super) fn encode_value(
|
||||
Value::Date(v) => {
|
||||
if let Some(date) = v.to_chrono_date() {
|
||||
let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
|
||||
builder.encode_field(&StylingDate(&date, style, order))
|
||||
builder.encode_field(&StylingDate(date, style, order))
|
||||
} else {
|
||||
Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Failed to convert date to postgres type {v:?}",),
|
||||
@@ -101,9 +413,10 @@ pub(super) fn encode_value(
|
||||
}
|
||||
}
|
||||
Value::DateTime(v) => {
|
||||
if let Some(datetime) = v.to_chrono_datetime() {
|
||||
if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
|
||||
{
|
||||
let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
|
||||
builder.encode_field(&StylingDateTime(&datetime, style, order))
|
||||
builder.encode_field(&StylingDateTime(datetime, style, order))
|
||||
} else {
|
||||
Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Failed to convert date to postgres type {v:?}",),
|
||||
@@ -111,9 +424,10 @@ pub(super) fn encode_value(
|
||||
}
|
||||
}
|
||||
Value::Timestamp(v) => {
|
||||
if let Some(datetime) = v.to_chrono_datetime() {
|
||||
if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
|
||||
{
|
||||
let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
|
||||
builder.encode_field(&StylingDateTime(&datetime, style, order))
|
||||
builder.encode_field(&StylingDateTime(datetime, style, order))
|
||||
} else {
|
||||
Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!("Failed to convert date to postgres type {v:?}",),
|
||||
@@ -131,14 +445,13 @@ pub(super) fn encode_value(
|
||||
}
|
||||
Value::Interval(v) => builder.encode_field(&PgInterval::from(*v)),
|
||||
Value::Decimal128(v) => builder.encode_field(&v.to_string()),
|
||||
Value::List(_) | Value::Duration(_) => {
|
||||
Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"cannot write value {:?} in postgres protocol: unimplemented",
|
||||
&value
|
||||
),
|
||||
})))
|
||||
}
|
||||
Value::List(values) => encode_array(query_ctx, values, builder),
|
||||
Value::Duration(_) => Err(PgWireError::ApiError(Box::new(Error::Internal {
|
||||
err_msg: format!(
|
||||
"cannot write value {:?} in postgres protocol: unimplemented",
|
||||
&value
|
||||
),
|
||||
}))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,19 +468,45 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
|
||||
&ConcreteDataType::Binary(_) => Ok(Type::BYTEA),
|
||||
&ConcreteDataType::String(_) => Ok(Type::VARCHAR),
|
||||
&ConcreteDataType::Date(_) => Ok(Type::DATE),
|
||||
&ConcreteDataType::DateTime(_) => Ok(Type::TIMESTAMP),
|
||||
&ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
|
||||
&ConcreteDataType::DateTime(_) | &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
|
||||
&ConcreteDataType::Time(_) => Ok(Type::TIME),
|
||||
&ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
|
||||
&ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
|
||||
&ConcreteDataType::Json(_) => Ok(Type::JSON),
|
||||
&ConcreteDataType::Duration(_)
|
||||
| &ConcreteDataType::List(_)
|
||||
| &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
|
||||
data_type: origin,
|
||||
reason: "not implemented",
|
||||
ConcreteDataType::List(list) => match list.item_type() {
|
||||
&ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
|
||||
&ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
|
||||
&ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
|
||||
&ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
|
||||
&ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
|
||||
&ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
|
||||
&ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
|
||||
&ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
|
||||
&ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
|
||||
&ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
|
||||
&ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
|
||||
&ConcreteDataType::DateTime(_) | &ConcreteDataType::Timestamp(_) => {
|
||||
Ok(Type::TIMESTAMP_ARRAY)
|
||||
}
|
||||
&ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
|
||||
&ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
|
||||
&ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
|
||||
&ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
|
||||
&ConcreteDataType::Duration(_)
|
||||
| &ConcreteDataType::Dictionary(_)
|
||||
| &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
|
||||
data_type: origin,
|
||||
reason: "not implemented",
|
||||
}
|
||||
.fail(),
|
||||
},
|
||||
&ConcreteDataType::Duration(_) | &ConcreteDataType::Dictionary(_) => {
|
||||
server_error::UnsupportedDataTypeSnafu {
|
||||
data_type: origin,
|
||||
reason: "not implemented",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -621,6 +960,7 @@ mod test {
|
||||
|
||||
use common_time::interval::IntervalUnit;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::schema::{ColumnSchema, Schema};
|
||||
use datatypes::value::ListValue;
|
||||
use pgwire::api::results::{FieldFormat, FieldInfo};
|
||||
@@ -816,6 +1156,34 @@ mod test {
|
||||
Type::INTERVAL,
|
||||
FieldFormat::Text,
|
||||
),
|
||||
FieldInfo::new(
|
||||
"int_list".into(),
|
||||
None,
|
||||
None,
|
||||
Type::INT8_ARRAY,
|
||||
FieldFormat::Text,
|
||||
),
|
||||
FieldInfo::new(
|
||||
"float_list".into(),
|
||||
None,
|
||||
None,
|
||||
Type::FLOAT8_ARRAY,
|
||||
FieldFormat::Text,
|
||||
),
|
||||
FieldInfo::new(
|
||||
"string_list".into(),
|
||||
None,
|
||||
None,
|
||||
Type::VARCHAR_ARRAY,
|
||||
FieldFormat::Text,
|
||||
),
|
||||
FieldInfo::new(
|
||||
"timestamp_list".into(),
|
||||
None,
|
||||
None,
|
||||
Type::TIMESTAMP_ARRAY,
|
||||
FieldFormat::Text,
|
||||
),
|
||||
];
|
||||
|
||||
let datatypes = vec![
|
||||
@@ -846,6 +1214,10 @@ mod test {
|
||||
ConcreteDataType::datetime_datatype(),
|
||||
ConcreteDataType::timestamp_datatype(TimeUnit::Second),
|
||||
ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
|
||||
ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
|
||||
ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
|
||||
ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
|
||||
ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
|
||||
];
|
||||
let values = vec![
|
||||
Value::Null,
|
||||
@@ -875,6 +1247,22 @@ mod test {
|
||||
Value::DateTime(1000001i64.into()),
|
||||
Value::Timestamp(1000001i64.into()),
|
||||
Value::Interval(1000001i128.into()),
|
||||
Value::List(ListValue::new(
|
||||
vec![Value::Int64(1i64)],
|
||||
ConcreteDataType::int64_datatype(),
|
||||
)),
|
||||
Value::List(ListValue::new(
|
||||
vec![Value::Float64(1.0f64.into())],
|
||||
ConcreteDataType::float64_datatype(),
|
||||
)),
|
||||
Value::List(ListValue::new(
|
||||
vec![Value::String("tom".into())],
|
||||
ConcreteDataType::string_datatype(),
|
||||
)),
|
||||
Value::List(ListValue::new(
|
||||
vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
|
||||
ConcreteDataType::timestamp_second_datatype(),
|
||||
)),
|
||||
];
|
||||
let query_context = QueryContextBuilder::default()
|
||||
.configuration_parameter(Default::default())
|
||||
@@ -884,22 +1272,6 @@ mod test {
|
||||
for (value, datatype) in values.iter().zip(datatypes) {
|
||||
encode_value(&query_context, value, &mut builder, &datatype).unwrap();
|
||||
}
|
||||
|
||||
let err = encode_value(
|
||||
&query_context,
|
||||
&Value::List(ListValue::new(vec![], ConcreteDataType::int16_datatype())),
|
||||
&mut builder,
|
||||
&ConcreteDataType::list_datatype(ConcreteDataType::int16_datatype()),
|
||||
)
|
||||
.unwrap_err();
|
||||
match err {
|
||||
PgWireError::ApiError(e) => {
|
||||
assert!(format!("{e}").contains("Internal error:"));
|
||||
}
|
||||
_ => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -19,10 +19,10 @@ use postgres_types::{IsNull, ToSql, Type};
|
||||
use session::session_config::{PGDateOrder, PGDateTimeStyle};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StylingDate<'a>(pub &'a NaiveDate, pub PGDateTimeStyle, pub PGDateOrder);
|
||||
pub struct StylingDate(pub NaiveDate, pub PGDateTimeStyle, pub PGDateOrder);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StylingDateTime<'a>(pub &'a NaiveDateTime, pub PGDateTimeStyle, pub PGDateOrder);
|
||||
pub struct StylingDateTime(pub NaiveDateTime, pub PGDateTimeStyle, pub PGDateOrder);
|
||||
|
||||
fn date_format_string(style: PGDateTimeStyle, order: PGDateOrder) -> &'static str {
|
||||
match style {
|
||||
@@ -53,7 +53,7 @@ fn datetime_format_string(style: PGDateTimeStyle, order: PGDateOrder) -> &'stati
|
||||
},
|
||||
}
|
||||
}
|
||||
impl ToSqlText for StylingDate<'_> {
|
||||
impl ToSqlText for StylingDate {
|
||||
fn to_sql_text(
|
||||
&self,
|
||||
ty: &Type,
|
||||
@@ -78,7 +78,7 @@ impl ToSqlText for StylingDate<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl ToSqlText for StylingDateTime<'_> {
|
||||
impl ToSqlText for StylingDateTime {
|
||||
fn to_sql_text(
|
||||
&self,
|
||||
ty: &Type,
|
||||
@@ -112,7 +112,7 @@ impl ToSqlText for StylingDateTime<'_> {
|
||||
|
||||
macro_rules! delegate_to_sql {
|
||||
($delegator:ident, $delegatee:ident) => {
|
||||
impl ToSql for $delegator<'_> {
|
||||
impl ToSql for $delegator {
|
||||
fn to_sql(
|
||||
&self,
|
||||
ty: &Type,
|
||||
@@ -148,7 +148,7 @@ mod tests {
|
||||
let naive_date = NaiveDate::from_ymd_opt(1997, 12, 17).unwrap();
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::ISO, PGDateOrder::MDY);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::ISO, PGDateOrder::MDY);
|
||||
let expected = "1997-12-17";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -157,7 +157,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::ISO, PGDateOrder::YMD);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::ISO, PGDateOrder::YMD);
|
||||
let expected = "1997-12-17";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -166,7 +166,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::ISO, PGDateOrder::DMY);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::ISO, PGDateOrder::DMY);
|
||||
let expected = "1997-12-17";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -175,7 +175,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::German, PGDateOrder::MDY);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::German, PGDateOrder::MDY);
|
||||
let expected = "17.12.1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -184,7 +184,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::German, PGDateOrder::YMD);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::German, PGDateOrder::YMD);
|
||||
let expected = "17.12.1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -193,7 +193,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::German, PGDateOrder::DMY);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::German, PGDateOrder::DMY);
|
||||
let expected = "17.12.1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -202,8 +202,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date =
|
||||
StylingDate(&naive_date, PGDateTimeStyle::Postgres, PGDateOrder::MDY);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::Postgres, PGDateOrder::MDY);
|
||||
let expected = "12-17-1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -212,8 +211,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date =
|
||||
StylingDate(&naive_date, PGDateTimeStyle::Postgres, PGDateOrder::YMD);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::Postgres, PGDateOrder::YMD);
|
||||
let expected = "12-17-1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -222,8 +220,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date =
|
||||
StylingDate(&naive_date, PGDateTimeStyle::Postgres, PGDateOrder::DMY);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::Postgres, PGDateOrder::DMY);
|
||||
let expected = "17-12-1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -232,7 +229,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::SQL, PGDateOrder::MDY);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::SQL, PGDateOrder::MDY);
|
||||
let expected = "12/17/1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -241,7 +238,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::SQL, PGDateOrder::YMD);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::SQL, PGDateOrder::YMD);
|
||||
let expected = "12/17/1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -250,7 +247,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_date = StylingDate(&naive_date, PGDateTimeStyle::SQL, PGDateOrder::DMY);
|
||||
let styling_date = StylingDate(naive_date, PGDateTimeStyle::SQL, PGDateOrder::DMY);
|
||||
let expected = "17/12/1997";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_date.to_sql_text(&Type::DATE, &mut out).unwrap();
|
||||
@@ -266,7 +263,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::ISO, PGDateOrder::MDY);
|
||||
let styling_datetime = StylingDateTime(input, PGDateTimeStyle::ISO, PGDateOrder::MDY);
|
||||
let expected = "2021-09-01 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -277,7 +274,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::ISO, PGDateOrder::YMD);
|
||||
let styling_datetime = StylingDateTime(input, PGDateTimeStyle::ISO, PGDateOrder::YMD);
|
||||
let expected = "2021-09-01 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -288,7 +285,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::ISO, PGDateOrder::DMY);
|
||||
let styling_datetime = StylingDateTime(input, PGDateTimeStyle::ISO, PGDateOrder::DMY);
|
||||
let expected = "2021-09-01 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -300,7 +297,7 @@ mod tests {
|
||||
|
||||
{
|
||||
let styling_datetime =
|
||||
StylingDateTime(&input, PGDateTimeStyle::German, PGDateOrder::MDY);
|
||||
StylingDateTime(input, PGDateTimeStyle::German, PGDateOrder::MDY);
|
||||
let expected = "01.09.2021 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -312,7 +309,7 @@ mod tests {
|
||||
|
||||
{
|
||||
let styling_datetime =
|
||||
StylingDateTime(&input, PGDateTimeStyle::German, PGDateOrder::YMD);
|
||||
StylingDateTime(input, PGDateTimeStyle::German, PGDateOrder::YMD);
|
||||
let expected = "01.09.2021 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -324,7 +321,7 @@ mod tests {
|
||||
|
||||
{
|
||||
let styling_datetime =
|
||||
StylingDateTime(&input, PGDateTimeStyle::German, PGDateOrder::DMY);
|
||||
StylingDateTime(input, PGDateTimeStyle::German, PGDateOrder::DMY);
|
||||
let expected = "01.09.2021 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -336,7 +333,7 @@ mod tests {
|
||||
|
||||
{
|
||||
let styling_datetime =
|
||||
StylingDateTime(&input, PGDateTimeStyle::Postgres, PGDateOrder::MDY);
|
||||
StylingDateTime(input, PGDateTimeStyle::Postgres, PGDateOrder::MDY);
|
||||
let expected = "Wed Sep 01 12:34:56.789012 2021";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -348,7 +345,7 @@ mod tests {
|
||||
|
||||
{
|
||||
let styling_datetime =
|
||||
StylingDateTime(&input, PGDateTimeStyle::Postgres, PGDateOrder::YMD);
|
||||
StylingDateTime(input, PGDateTimeStyle::Postgres, PGDateOrder::YMD);
|
||||
let expected = "Wed Sep 01 12:34:56.789012 2021";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -360,7 +357,7 @@ mod tests {
|
||||
|
||||
{
|
||||
let styling_datetime =
|
||||
StylingDateTime(&input, PGDateTimeStyle::Postgres, PGDateOrder::DMY);
|
||||
StylingDateTime(input, PGDateTimeStyle::Postgres, PGDateOrder::DMY);
|
||||
let expected = "Wed 01 Sep 12:34:56.789012 2021";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -371,7 +368,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::SQL, PGDateOrder::MDY);
|
||||
let styling_datetime = StylingDateTime(input, PGDateTimeStyle::SQL, PGDateOrder::MDY);
|
||||
let expected = "09/01/2021 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -382,7 +379,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::SQL, PGDateOrder::YMD);
|
||||
let styling_datetime = StylingDateTime(input, PGDateTimeStyle::SQL, PGDateOrder::YMD);
|
||||
let expected = "09/01/2021 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
@@ -393,7 +390,7 @@ mod tests {
|
||||
}
|
||||
|
||||
{
|
||||
let styling_datetime = StylingDateTime(&input, PGDateTimeStyle::SQL, PGDateOrder::DMY);
|
||||
let styling_datetime = StylingDateTime(input, PGDateTimeStyle::SQL, PGDateOrder::DMY);
|
||||
let expected = "01/09/2021 12:34:56.789012";
|
||||
let mut out = bytes::BytesMut::new();
|
||||
let is_null = styling_datetime
|
||||
|
||||
@@ -69,6 +69,7 @@ macro_rules! sql_tests {
|
||||
test_postgres_bytea,
|
||||
test_postgres_datestyle,
|
||||
test_postgres_parameter_inference,
|
||||
test_postgres_array_types,
|
||||
test_mysql_prepare_stmt_insert_timestamp,
|
||||
);
|
||||
)*
|
||||
@@ -1111,3 +1112,34 @@ pub async fn test_mysql_prepare_stmt_insert_timestamp(store_type: StorageType) {
|
||||
let _ = server.shutdown().await;
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_postgres_array_types(store_type: StorageType) {
|
||||
let (addr, mut guard, fe_pg_server) = setup_pg_server(store_type, "sql_inference").await;
|
||||
|
||||
let (client, connection) = tokio_postgres::connect(&format!("postgres://{addr}/public"), NoTls)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
tokio::spawn(async move {
|
||||
connection.await.unwrap();
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
|
||||
let rows = client
|
||||
.query(
|
||||
"SELECT arrow_cast(1, 'List(Int8)'), arrow_cast('tom', 'List(Utf8)'), arrow_cast(3.14, 'List(Float32)'), arrow_cast('2023-01-02T12:53:02', 'List(Timestamp(Millisecond, None))')",
|
||||
&[],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(1, rows.len());
|
||||
|
||||
// Shutdown the client.
|
||||
drop(client);
|
||||
rx.await.unwrap();
|
||||
|
||||
let _ = fe_pg_server.shutdown().await;
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
@@ -158,11 +158,11 @@ SELECT geohash(37.76938, -122.3889, 11);
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 100);
|
||||
|
||||
Error: 3001(EngineExecuteQuery), Geohash error: Invalid length specified: 100. Accepted values are between 1 and 12, inclusive
|
||||
Error: 3001(EngineExecuteQuery), Invalid geohash resolution 100, expect value: [1, 12]
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, -1);
|
||||
|
||||
Error: 3001(EngineExecuteQuery), Geohash error: Invalid length specified: 18446744073709551615. Accepted values are between 1 and 12, inclusive
|
||||
Error: 3001(EngineExecuteQuery), Invalid geohash resolution -1, expect value: [1, 12]
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 11::Int8);
|
||||
|
||||
@@ -228,3 +228,11 @@ SELECT geohash(37.76938, -122.3889, 11::UInt64);
|
||||
| 9q8yygxneft |
|
||||
+------------------------------------------------------------------------------------+
|
||||
|
||||
SELECT geohash_neighbours(37.76938, -122.3889, 11);
|
||||
|
||||
+----------------------------------------------------------------------------------------------------------+
|
||||
| geohash_neighbours(Float64(37.76938),Float64(-122.3889),Int64(11)) |
|
||||
+----------------------------------------------------------------------------------------------------------+
|
||||
| [9q8yygxnefv, 9q8yygxnefu, 9q8yygxnefs, 9q8yygxnefk, 9q8yygxnefm, 9q8yygxnefq, 9q8yygxnefw, 9q8yygxnefy] |
|
||||
+----------------------------------------------------------------------------------------------------------+
|
||||
|
||||
|
||||
@@ -64,3 +64,5 @@ SELECT geohash(37.76938, -122.3889, 11::UInt16);
|
||||
SELECT geohash(37.76938, -122.3889, 11::UInt32);
|
||||
|
||||
SELECT geohash(37.76938, -122.3889, 11::UInt64);
|
||||
|
||||
SELECT geohash_neighbours(37.76938, -122.3889, 11);
|
||||
|
||||
Reference in New Issue
Block a user