Compare commits

...

19 Commits

Author SHA1 Message Date
Ning Sun
a362c4fb22 chore: use explicit geo_json version 2022-11-01 16:12:01 +08:00
Ning Sun
20fcc7819a feat: implement postgres read/write for geo data 2022-11-01 16:03:33 +08:00
Ning Sun
8119bbd7b4 feat: update sqlparser api 2022-10-20 10:57:11 +08:00
Ning Sun
48693cb12d fix: rename wkt conversion function 2022-10-19 14:52:48 +08:00
Ning Sun
8d938c3ac8 feat: use our forked sqlparser to parse Geometry(POINT) syntax 2022-10-19 14:13:26 +08:00
liangxingjian
3235436f60 fix: format 2022-10-19 10:31:02 +08:00
liangxingjian
3bf2c9840d feat: add impl of arrow array access 2022-10-19 10:25:58 +08:00
liangxingjian
35afa9dc74 fix: fix some error 2022-10-18 15:09:45 +08:00
liangxingjian
f4d8c2cef6 feat: implement simple sql demo 2022-10-18 11:54:38 +08:00
Ning Sun
788001b4bc fix: resolve lint warnings 2022-10-18 11:32:42 +08:00
Ning Sun
92ab3002c9 fix: resolve check warnings 2022-10-18 11:27:59 +08:00
Ning Sun
36ce08cb03 refactor: set inner subtype to ConcreteDataType::Geometry 2022-10-18 11:16:14 +08:00
liangxingjian
2f159dbe22 feat:add impl of geo-vec,add some unit test 2022-10-14 18:04:12 +08:00
liangxingjian
3d7d029cb5 feat:add some impl and test with a little refactor 2022-10-13 18:12:19 +08:00
liangxingjian
7aed777bc4 feat:add iter and ref of geo types 2022-10-12 17:06:42 +08:00
liangxingjian
ebcd18d3c4 feat:add some impl of geo type 2022-10-11 19:07:15 +08:00
liangxingjian
d44887bada feat:add some geo vec impl 2022-10-10 17:52:17 +08:00
liangxingjian
c0893ac19b fix:fix some error 2022-10-10 15:23:38 +08:00
liangxingjian
8a91e26020 feat:init to add new geo types 2022-10-09 17:46:06 +08:00
26 changed files with 1388 additions and 178 deletions

739
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -30,3 +30,6 @@ members = [
"src/table-engine",
"test-util",
]
[patch.crates-io]
sqlparser = { git = "https://github.com/sunng87/sqlparser-rs.git", branch = "feature/argument-for-custom-type-for-v015" }

View File

@@ -67,6 +67,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::Null(_) | ConcreteDataType::List(_) => {
return error::IntoColumnDataTypeSnafu { from: datatype }.fail()
}
ConcreteDataType::Geometry(_) => todo!(),
});
Ok(datatype)
}

View File

@@ -194,6 +194,7 @@ fn try_into_scalar_value(value: Value, datatype: &ConcreteDataType) -> Result<Sc
Value::Null => try_convert_null_value(datatype)?,
Value::List(list) => try_convert_list_value(list)?,
Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
Value::Geometry(_) => todo!(),
})
}

View File

@@ -186,6 +186,7 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
.collect(),
ConcreteDataType::Null(_) => unreachable!(),
ConcreteDataType::List(_) => unreachable!(),
ConcreteDataType::Geometry(_) => todo!(),
}
}

View File

@@ -13,6 +13,8 @@ common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
enum_dispatch = "0.3"
geo = { version = "0.23.0", features = ["use-serde"] }
geojson = "0.24"
num = "0.4"
num-traits = "0.2"
ordered-float = { version = "3.0", features = ["serde"] }
@@ -20,8 +22,18 @@ paste = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
wkt = "0.10.3"
[dependencies.arrow]
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
features = [
"io_csv",
"io_json",
"io_parquet",
"io_parquet_compression",
"io_ipc",
"ahash",
"compute",
"serde_types",
]

View File

@@ -1,6 +1,6 @@
use arrow::array::{
self, Array, BinaryArray as ArrowBinaryArray, MutableBinaryArray as ArrowMutableBinaryArray,
MutableUtf8Array, PrimitiveArray, Utf8Array,
MutableUtf8Array, PrimitiveArray, StructArray, Utf8Array,
};
use arrow::datatypes::DataType as ArrowDataType;
use common_time::timestamp::Timestamp;
@@ -8,7 +8,7 @@ use snafu::OptionExt;
use crate::error::{ConversionSnafu, Result};
use crate::prelude::ConcreteDataType;
use crate::value::Value;
use crate::value::{GeometryValue, Value};
pub type BinaryArray = ArrowBinaryArray<i64>;
pub type MutableBinaryArray = ArrowMutableBinaryArray<i64>;
@@ -69,6 +69,28 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result<Value> {
};
Value::Timestamp(Timestamp::new(value, unit))
}
ArrowDataType::Struct(_) => {
let struct_array = array.as_any().downcast_ref::<StructArray>().unwrap();
let ref_x_array = struct_array
.values()
.get(0)
.unwrap()
.as_any()
.downcast_ref::<PrimitiveArray<f64>>()
.unwrap();
let ref_y_array = struct_array
.values()
.get(1)
.unwrap()
.as_any()
.downcast_ref::<PrimitiveArray<f64>>()
.unwrap();
let (x, y) = (ref_x_array.value(idx), ref_y_array.value(idx));
Value::Geometry(GeometryValue::new_point(x, y))
}
// TODO(sunng87): List
_ => unimplemented!("Arrow array datatype: {:?}", array.data_type()),
};

View File

@@ -8,8 +8,9 @@ use serde::{Deserialize, Serialize};
use crate::error::{self, Error, Result};
use crate::type_id::LogicalTypeId;
use crate::types::{
BinaryType, BooleanType, DateType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Int8Type, ListType, NullType, StringType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
BinaryType, BooleanType, DateType, Float32Type, Float64Type, GeometryType, Int16Type,
Int32Type, Int64Type, Int8Type, ListType, NullType, StringType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
};
use crate::types::{DateTimeType, TimestampType};
use crate::value::Value;
@@ -42,6 +43,7 @@ pub enum ConcreteDataType {
Timestamp(TimestampType),
List(ListType),
Geometry(GeometryType),
}
impl ConcreteDataType {
@@ -63,6 +65,7 @@ impl ConcreteDataType {
| ConcreteDataType::Date(_)
| ConcreteDataType::DateTime(_)
| ConcreteDataType::Timestamp(_)
| ConcreteDataType::Geometry(_)
)
}
@@ -142,6 +145,7 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
ArrowDataType::List(field) => Self::List(ListType::new(
ConcreteDataType::from_arrow_type(&field.data_type),
)),
ArrowDataType::Struct(_) => ConcreteDataType::geometry_datatype(GeometryType::Point),
_ => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: dt.clone(),
@@ -186,6 +190,11 @@ impl ConcreteDataType {
ConcreteDataType::Timestamp(TimestampType::new(TimeUnit::Millisecond))
}
// FIXME: specify inner type
pub fn geometry_datatype(inner_type: GeometryType) -> Self {
ConcreteDataType::Geometry(inner_type)
}
/// Converts from arrow timestamp unit to
// TODO(hl): maybe impl From<ArrowTimestamp> for our timestamp ?
pub fn from_arrow_time_unit(t: &arrow::datatypes::TimeUnit) -> Self {

View File

@@ -3,7 +3,8 @@ use std::any::Any;
use common_time::{Date, DateTime, Timestamp};
use crate::prelude::*;
use crate::value::{ListValue, ListValueRef};
use crate::value::{GeometryValue, GeometryValueRef, ListValue, ListValueRef};
use crate::vectors::all::GeometryVector;
use crate::vectors::*;
fn get_iter_capacity<T, I: Iterator<Item = T>>(iter: &I) -> usize {
@@ -340,6 +341,36 @@ impl<'a> ScalarRef<'a> for ListValueRef<'a> {
}
}
impl Scalar for GeometryValue {
type VectorType = GeometryVector;
type RefType<'a> = GeometryValueRef<'a>;
fn as_scalar_ref(&self) -> Self::RefType<'_> {
GeometryValueRef::Ref { val: self }
}
fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> {
long
}
}
impl<'a> ScalarRef<'a> for GeometryValueRef<'a> {
type VectorType = GeometryVector;
type ScalarType = GeometryValue;
fn to_owned_scalar(&self) -> Self::ScalarType {
match self {
GeometryValueRef::Indexed { vector, idx } => match vector.get(*idx) {
Value::Geometry(value) => value,
_ => unreachable!(),
},
GeometryValueRef::Ref { val } => **val,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -31,6 +31,8 @@ pub enum LogicalTypeId {
Timestamp,
List,
Geometry,
}
impl LogicalTypeId {
@@ -42,6 +44,7 @@ impl LogicalTypeId {
#[cfg(any(test, feature = "test"))]
pub fn data_type(&self) -> crate::data_type::ConcreteDataType {
use crate::data_type::ConcreteDataType;
use crate::types::GeometryType;
match self {
LogicalTypeId::Null => ConcreteDataType::null_datatype(),
@@ -64,6 +67,7 @@ impl LogicalTypeId {
LogicalTypeId::List => {
ConcreteDataType::list_datatype(ConcreteDataType::null_datatype())
}
LogicalTypeId::Geometry => ConcreteDataType::geometry_datatype(GeometryType::default()),
}
}
}

View File

@@ -2,6 +2,7 @@ mod binary_type;
mod boolean_type;
mod date;
mod datetime;
mod geometry;
mod list_type;
mod null_type;
mod primitive_traits;
@@ -13,6 +14,7 @@ pub use binary_type::BinaryType;
pub use boolean_type::BooleanType;
pub use date::DateType;
pub use datetime::DateTimeType;
pub use geometry::GeometryType;
pub use list_type::ListType;
pub use null_type::NullType;
pub use primitive_traits::{OrdPrimitive, Primitive};

View File

@@ -0,0 +1,59 @@
use arrow::datatypes::Field;
use serde::{Deserialize, Serialize};
use crate::arrow::datatypes::DataType::Float64;
use crate::data_type::DataType;
use crate::prelude::LogicalTypeId;
use crate::value::GeometryValue;
use crate::vectors::geometry::GeometryVectorBuilder;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum GeometryType {
Point,
}
impl Default for GeometryType {
fn default() -> Self {
Self::Point
}
}
impl DataType for GeometryType {
fn name(&self) -> &str {
Self::GEOMETRY_TYPE_NAME
}
fn logical_type_id(&self) -> crate::type_id::LogicalTypeId {
LogicalTypeId::Geometry
}
fn default_value(&self) -> crate::value::Value {
match self {
GeometryType::Point => GeometryValue::new_point(0.0, 0.0).to_value(),
}
}
// TODO: check if unreachable
fn as_arrow_type(&self) -> arrow::datatypes::DataType {
let fields = vec![
Field::new("x", Float64, true),
Field::new("y", Float64, true),
];
arrow::datatypes::DataType::Struct(fields)
}
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn crate::vectors::MutableVector> {
match self {
GeometryType::Point => Box::new(
GeometryVectorBuilder::with_capacity_point_vector_builder(capacity),
),
}
}
}
impl GeometryType {
pub const GEOMETRY_TYPE_NAME: &'static str = "Geometry";
pub const GEOMETRY_SUBTYPE_POINT_NAME: &'static str = "POINT";
pub const GEOMETRY_SUBTYPE_LINESTRING_NAME: &'static str = "LINESTRING";
pub const GEOMETRY_SUBTYPE_POLYGON_NAME: &'static str = "POLYGON";
}

View File

@@ -1,14 +1,24 @@
use std::cmp::Ordering;
use std::str::FromStr;
use common_base::bytes::{Bytes, StringBytes};
use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::timestamp::Timestamp;
use geo::Point;
use geojson::de::deserialize_geometry;
use geojson::ser::serialize_geometry;
pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use wkt::Geometry;
use wkt::ToWkt;
use wkt::TryFromWkt;
use wkt::Wkt;
use crate::error::{self, Result};
use crate::prelude::*;
use crate::types::GeometryType;
use crate::vectors::all::GeometryVector;
use crate::vectors::ListVector;
pub type OrderedF32 = OrderedFloat<f32>;
@@ -44,6 +54,7 @@ pub enum Value {
Timestamp(Timestamp),
List(ListValue),
Geometry(GeometryValue),
}
impl Value {
@@ -71,6 +82,7 @@ impl Value {
Value::Date(_) => ConcreteDataType::date_datatype(),
Value::DateTime(_) => ConcreteDataType::date_datatype(),
Value::Timestamp(v) => ConcreteDataType::timestamp_datatype(v.unit()),
Value::Geometry(geom) => ConcreteDataType::geometry_datatype(geom.subtype()),
}
}
@@ -112,6 +124,7 @@ impl Value {
Value::DateTime(v) => ValueRef::DateTime(*v),
Value::List(v) => ValueRef::List(ListValueRef::Ref { val: v }),
Value::Timestamp(v) => ValueRef::Timestamp(*v),
Value::Geometry(v) => ValueRef::Geometry(GeometryValueRef::Ref { val: v }),
}
}
}
@@ -249,6 +262,7 @@ impl TryFrom<Value> for serde_json::Value {
Value::DateTime(v) => serde_json::Value::Number(v.val().into()),
Value::List(v) => serde_json::to_value(v)?,
Value::Timestamp(v) => serde_json::to_value(v.value())?,
Value::Geometry(v) => serde_json::to_value(v.to_value())?,
};
Ok(json_value)
@@ -304,6 +318,59 @@ impl Ord for ListValue {
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Eq, Copy)]
pub enum GeometryValue {
#[serde(
serialize_with = "serialize_geometry",
deserialize_with = "deserialize_geometry"
)]
Point(Point<OrderedF64>),
}
impl GeometryValue {
pub fn new_point(x: f64, y: f64) -> Self {
let point = Point::<OrderedF64>::new(x.into(), y.into());
GeometryValue::Point(point)
}
pub fn to_value(self) -> Value {
Value::Geometry(self)
}
pub fn as_ref(&self) -> GeometryValueRef {
GeometryValueRef::Ref { val: self }
}
pub fn from_wkt(s: &str) -> Result<Self> {
let wktls: Wkt<OrderedF64> = Wkt::from_str(s).unwrap();
match wktls.item {
Geometry::Point(_) => {
let p = Point::try_from_wkt_str(s).unwrap();
Ok(Self::Point(p))
}
Geometry::LineString(_) => todo!(),
Geometry::Polygon(_) => todo!(),
_ => unimplemented!(),
}
}
pub fn to_wkt(&self) -> String {
match self {
GeometryValue::Point(p) => p.wkt_string(),
}
}
pub fn subtype(&self) -> GeometryType {
match self {
Self::Point(_) => GeometryType::Point,
}
}
}
impl Default for GeometryValue {
fn default() -> Self {
unimplemented!() //lack of type info
}
}
/// Reference to [Value].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValueRef<'a> {
@@ -331,6 +398,7 @@ pub enum ValueRef<'a> {
DateTime(DateTime),
Timestamp(Timestamp),
List(ListValueRef<'a>),
Geometry(GeometryValueRef<'a>),
}
macro_rules! impl_as_for_value_ref {
@@ -521,6 +589,47 @@ impl<'a> PartialOrd for ListValueRef<'a> {
}
}
#[derive(Debug, Clone, Copy)]
pub enum GeometryValueRef<'a> {
Indexed {
vector: &'a GeometryVector,
idx: usize,
},
Ref {
val: &'a GeometryValue,
},
}
impl<'a> GeometryValueRef<'a> {
/// Convert self to [Value]. This method would clone the underlying data.
fn to_value(self) -> Value {
match self {
GeometryValueRef::Indexed { vector, idx } => vector.get(idx),
GeometryValueRef::Ref { val } => Value::Geometry(*val),
}
}
}
impl<'a> PartialEq for GeometryValueRef<'a> {
fn eq(&self, other: &Self) -> bool {
self.to_value().eq(&other.to_value())
}
}
impl<'a> Eq for GeometryValueRef<'a> {}
impl<'a> Ord for GeometryValueRef<'a> {
fn cmp(&self, _other: &Self) -> Ordering {
unreachable!()
}
}
impl<'a> PartialOrd for GeometryValueRef<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -5,6 +5,7 @@ pub mod constant;
pub mod date;
pub mod datetime;
mod eq;
pub mod geometry;
mod helper;
mod list;
pub mod mutable;
@@ -17,10 +18,10 @@ mod timestamp;
pub mod all {
//! All vector types.
pub use crate::vectors::{
BinaryVector, BooleanVector, ConstantVector, DateTimeVector, DateVector, Float32Vector,
Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, ListVector, NullVector,
PrimitiveVector, StringVector, TimestampVector, UInt16Vector, UInt32Vector, UInt64Vector,
UInt8Vector,
geometry::GeometryVector, BinaryVector, BooleanVector, ConstantVector, DateTimeVector,
DateVector, Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector,
Int8Vector, ListVector, NullVector, PrimitiveVector, StringVector, TimestampVector,
UInt16Vector, UInt32Vector, UInt64Vector, UInt8Vector,
};
}

View File

@@ -4,6 +4,7 @@ use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::timestamp::Timestamp;
use super::geometry::GeometryVectorBuilder;
use crate::data_type::ConcreteDataType;
use crate::error::{self, Result};
use crate::prelude::ValueRef;
@@ -41,6 +42,7 @@ pub enum VectorBuilder {
Date(DateVectorBuilder),
DateTime(DateTimeVectorBuilder),
Timestamp(TimestampVectorBuilder),
Geometry(GeometryVectorBuilder),
}
impl VectorBuilder {
@@ -99,6 +101,9 @@ impl VectorBuilder {
ConcreteDataType::Timestamp(_) => {
VectorBuilder::Timestamp(TimestampVectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Geometry(_) => VectorBuilder::Geometry(
GeometryVectorBuilder::with_capacity_point_vector_builder(capacity),
),
_ => unimplemented!(),
}
}
@@ -122,6 +127,7 @@ impl VectorBuilder {
VectorBuilder::Date(b) => b.data_type(),
VectorBuilder::DateTime(b) => b.data_type(),
VectorBuilder::Timestamp(b) => b.data_type(),
VectorBuilder::Geometry(b) => b.data_type(),
}
}
@@ -153,6 +159,7 @@ impl VectorBuilder {
(VectorBuilder::Timestamp(b), Value::Int64(v)) => {
b.push(Some(Timestamp::from_millis(*v)))
}
(VectorBuilder::Geometry(b), Value::Geometry(v)) => b.push(Some(v.as_ref())),
_ => panic!(
"Value {:?} does not match builder type {:?}",
@@ -190,6 +197,7 @@ impl VectorBuilder {
VectorBuilder::Date(b) => b.push_value_ref(value),
VectorBuilder::DateTime(b) => b.push_value_ref(value),
VectorBuilder::Timestamp(b) => b.push_value_ref(value),
VectorBuilder::Geometry(b) => b.push_value_ref(value),
}
}
@@ -212,6 +220,7 @@ impl VectorBuilder {
VectorBuilder::Date(b) => b.push(None),
VectorBuilder::DateTime(b) => b.push(None),
VectorBuilder::Timestamp(b) => b.push(None),
VectorBuilder::Geometry(b) => b.push(None),
}
}
@@ -234,6 +243,7 @@ impl VectorBuilder {
VectorBuilder::Date(b) => Arc::new(b.finish()),
VectorBuilder::DateTime(b) => Arc::new(b.finish()),
VectorBuilder::Timestamp(b) => Arc::new(b.finish()),
VectorBuilder::Geometry(b) => Arc::new(b.finish()),
}
}
}

View File

@@ -76,6 +76,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool {
unreachable!("should not compare {} with {}", lhs.vector_type_name(), rhs.vector_type_name())
})
}
Geometry(_) => todo!(),
}
}

View File

@@ -0,0 +1,338 @@
use std::sync::Arc;
use arrow::array::{Array, MutableArray, StructArray};
use snafu::{OptionExt, ResultExt};
use self::point::{PointVector, PointVectorBuilder};
use super::{MutableVector, Vector};
use crate::error::SerializeSnafu;
use crate::prelude::ScalarRef;
use crate::types::GeometryType;
use crate::value::{GeometryValueRef, ValueRef};
use crate::{
data_type::ConcreteDataType,
prelude::{ScalarVector, ScalarVectorBuilder},
serialize::Serializable,
value::GeometryValue,
};
mod point;
#[derive(Debug, Clone, PartialEq)]
pub enum GeometryVector {
PointVector(PointVector),
}
impl Vector for GeometryVector {
fn data_type(&self) -> crate::data_type::ConcreteDataType {
let subtype = match self {
Self::PointVector(_) => GeometryType::Point,
};
ConcreteDataType::geometry_datatype(subtype)
}
fn vector_type_name(&self) -> String {
"GeometryVector".to_string()
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn len(&self) -> usize {
match self {
GeometryVector::PointVector(vec) => vec.array.len(),
}
}
fn to_arrow_array(&self) -> arrow::array::ArrayRef {
match self {
GeometryVector::PointVector(vec) => Arc::new(vec.array.clone()),
}
}
fn to_boxed_arrow_array(&self) -> Box<dyn arrow::array::Array> {
match self {
GeometryVector::PointVector(vec) => Box::new(vec.array.clone()),
}
}
fn validity(&self) -> super::Validity {
match self {
GeometryVector::PointVector(vec) => vec.validity(),
}
}
fn memory_size(&self) -> usize {
match self {
GeometryVector::PointVector(point_vector) => point_vector.memory_size(),
}
}
fn is_null(&self, row: usize) -> bool {
match self {
GeometryVector::PointVector(point_vector) => point_vector.array.is_null(row),
}
}
fn slice(&self, offset: usize, length: usize) -> super::VectorRef {
match self {
GeometryVector::PointVector(vec) => {
Arc::new(GeometryVector::PointVector(vec.slice(offset, length)))
}
}
}
fn get(&self, index: usize) -> crate::value::Value {
match self {
GeometryVector::PointVector(vec) => vec.get(index),
}
}
fn get_ref(&self, index: usize) -> crate::value::ValueRef {
if self.is_null(index) {
return ValueRef::Null;
}
ValueRef::Geometry(GeometryValueRef::Indexed {
vector: self,
idx: index,
})
}
}
impl ScalarVector for GeometryVector {
type OwnedItem = GeometryValue;
type RefItem<'a> = GeometryValueRef<'a>;
type Iter<'a> = GeometryVectorIter<'a>;
type Builder = GeometryVectorBuilder;
fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
match self.get_ref(idx) {
ValueRef::Null => None,
ValueRef::Geometry(geo_ref) => Some(geo_ref),
_ => unreachable!(),
}
}
fn iter_data(&self) -> Self::Iter<'_> {
GeometryVectorIter {
vector: self,
pos: 0,
}
}
}
pub struct GeometryVectorIter<'a> {
vector: &'a GeometryVector,
pos: usize,
}
impl<'a> Iterator for GeometryVectorIter<'a> {
type Item = Option<GeometryValueRef<'a>>;
fn next(&mut self) -> Option<Self::Item> {
let pos = self.pos;
self.pos += 1;
if self.vector.len() <= pos {
return None;
}
if self.vector.is_null(pos) {
return Some(None);
}
Some(Some(GeometryValueRef::Indexed {
vector: self.vector,
idx: pos,
}))
}
}
pub enum GeometryVectorBuilder {
PointVectorBuilder(PointVectorBuilder),
}
impl GeometryVectorBuilder {
pub fn new_point_vector_builder() -> Self {
Self::PointVectorBuilder(PointVectorBuilder::new())
}
pub fn with_capacity_point_vector_builder(capacity: usize) -> Self {
Self::PointVectorBuilder(PointVectorBuilder::with_capacity(capacity))
}
}
impl MutableVector for GeometryVectorBuilder {
fn data_type(&self) -> crate::data_type::ConcreteDataType {
let subtype = match self {
Self::PointVectorBuilder(_) => GeometryType::Point,
};
ConcreteDataType::geometry_datatype(subtype)
}
fn len(&self) -> usize {
match self {
GeometryVectorBuilder::PointVectorBuilder(builder) => builder.array_x.len(),
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}
fn to_vector(&mut self) -> super::VectorRef {
Arc::new(self.finish())
}
fn push_value_ref(&mut self, value: crate::value::ValueRef) -> crate::Result<()> {
match self {
GeometryVectorBuilder::PointVectorBuilder(builder) => match value {
ValueRef::Null => builder.push(None),
ValueRef::Geometry(geo_ref) => builder.push(Some(geo_ref.to_owned_scalar())),
_ => unreachable!(),
},
}
Ok(())
}
fn extend_slice_of(
&mut self,
vector: &dyn Vector,
offset: usize,
length: usize,
) -> crate::Result<()> {
let concrete_vector = vector
.as_any()
.downcast_ref::<GeometryVector>()
.with_context(|| crate::error::CastTypeSnafu {
msg: format!(
"Failed to cast vector from {} to {}",
vector.vector_type_name(),
stringify!(GeometryVector)
),
})?;
for idx in offset..offset + length {
let value = concrete_vector.get_ref(idx);
self.push_value_ref(value)?;
}
Ok(())
}
}
impl ScalarVectorBuilder for GeometryVectorBuilder {
type VectorType = GeometryVector;
fn with_capacity(_capacity: usize) -> Self {
unimplemented!()
}
fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
match value {
Some(geo_ref) => self.push_value_ref(ValueRef::Geometry(geo_ref)).unwrap(),
None => self.push_value_ref(ValueRef::Null).unwrap(),
}
}
fn finish(&mut self) -> Self::VectorType {
match self {
GeometryVectorBuilder::PointVectorBuilder(builder) => {
GeometryVector::PointVector(builder.finish())
}
}
}
}
impl Serializable for GeometryVector {
fn serialize_to_json(&self) -> crate::Result<Vec<serde_json::Value>> {
self.iter_data()
.map(|v| match v {
None => Ok(serde_json::Value::Null),
Some(s) => serde_json::to_value(s.to_owned_scalar()),
})
.collect::<serde_json::Result<_>>()
.context(SerializeSnafu)
}
}
impl GeometryVector {
pub fn try_from_arrow_array(
array: impl AsRef<dyn arrow::array::Array>,
) -> crate::error::Result<Self> {
let array = array
.as_ref()
.as_any()
.downcast_ref::<StructArray>()
.with_context(|| crate::error::ConversionSnafu {
from: std::format!("{:?}", array.as_ref().data_type()),
})?
.clone();
let pv = PointVector { array };
Ok(GeometryVector::PointVector(pv))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_geo_point_vector() {
let mut builder = GeometryVectorBuilder::with_capacity_point_vector_builder(0);
let value = GeometryValue::new_point(2.0, 1.0);
builder.push(Some(GeometryValueRef::Ref { val: &value }));
builder.push(None);
assert_eq!(builder.len(), 2);
let vector = builder.finish();
assert_eq!(vector.len(), 2);
assert!(vector.get(1).is_null());
assert!(vector.get_data(1).is_none());
assert_eq!(vector.get(0), value.to_value());
assert_eq!(vector.get_data(0).unwrap().to_owned_scalar(), value);
assert_eq!(
vector.data_type(),
ConcreteDataType::geometry_datatype(GeometryType::Point)
);
let iter = vector.iter_data();
let mut cnt: usize = 0;
for i in iter {
assert_eq!(i, vector.get_data(cnt));
cnt += 1;
}
assert_eq!(cnt, vector.len());
let slice = vector.slice(0, 2);
let mut builder = GeometryVectorBuilder::new_point_vector_builder();
builder.extend_slice_of(slice.as_ref(), 0, 2).unwrap();
let another = builder.finish();
assert_eq!(vector.get(0), another.get(0));
assert_eq!(vector.get(1), another.get(1));
assert_eq!(vector.get_data(0), another.get_data(0));
assert_eq!(vector.memory_size(), 32); //2 elements (f64,f64)=2*2*8
assert_eq!(
format!("{:?}",vector.serialize_to_json().unwrap()),
"[Object {\"Point\": Object {\"type\": String(\"Point\"), \"coordinates\": Array [Number(2.0), Number(1.0)]}}, Null]".to_string()
)
}
}

View File

@@ -0,0 +1,107 @@
use arrow::array::{Array, Float64Vec, MutableArray, PrimitiveArray, StructArray};
use arrow::datatypes::DataType::{self, Float64};
use arrow::datatypes::Field;
use crate::prelude::Validity;
use crate::value::{GeometryValue, Value};
use crate::vectors::impl_validity_for_vector;
#[derive(Debug, Clone, PartialEq)]
pub struct PointVector {
pub array: StructArray,
}
impl PointVector {
pub fn memory_size(&self) -> usize {
2 * self.array.len() * std::mem::size_of::<f64>()
}
pub fn validity(&self) -> Validity {
impl_validity_for_vector!(self.array)
}
pub fn slice(&self, offset: usize, length: usize) -> Self {
Self {
array: self.array.slice(offset, length),
}
}
pub fn get(&self, index: usize) -> Value {
if self.array.is_null(index) {
return Value::Null;
}
let ref_x_array = self
.array
.values()
.get(0)
.unwrap()
.as_any()
.downcast_ref::<PrimitiveArray<f64>>()
.unwrap();
let ref_y_array = self
.array
.values()
.get(1)
.unwrap()
.as_any()
.downcast_ref::<PrimitiveArray<f64>>()
.unwrap();
let (x, y) = (ref_x_array.value(index), ref_y_array.value(index));
GeometryValue::new_point(x, y).to_value()
}
}
pub struct PointVectorBuilder {
//pub array: MutableFixedSizeListArray,
pub array_x: Float64Vec,
pub array_y: Float64Vec,
}
impl Default for PointVectorBuilder {
fn default() -> Self {
Self {
array_x: Float64Vec::new(),
array_y: Float64Vec::new(),
}
}
}
impl PointVectorBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
array_x: Float64Vec::with_capacity(capacity),
array_y: Float64Vec::with_capacity(capacity),
}
}
pub fn push(&mut self, value: Option<GeometryValue>) {
match value {
Some(val) => match val {
GeometryValue::Point(xy) => {
self.array_x.push(Some(*xy.x()));
self.array_y.push(Some(*xy.y()));
}
},
None => {
self.array_x.push_null();
self.array_y.push_null();
}
}
}
pub fn finish(&mut self) -> PointVector {
let (x, y) = (self.array_x.as_arc(), self.array_y.as_arc());
let fields = vec![
Field::new("x", Float64, true),
Field::new("y", Float64, true),
];
let validity = x.validity().cloned();
let array = StructArray::new(DataType::Struct(fields), vec![x, y], validity);
PointVector { array }
}
}

View File

@@ -6,6 +6,7 @@ use std::sync::Arc;
use arrow::array::Array;
use arrow::datatypes::DataType as ArrowDataType;
use datafusion_common::ScalarValue;
use geometry::GeometryVector;
use snafu::OptionExt;
use crate::error::{ConversionSnafu, Result, UnknownVectorSnafu};
@@ -186,6 +187,7 @@ impl Helper {
ArrowDataType::Timestamp(_, _) => {
Arc::new(TimestampVector::try_from_arrow_array(array)?)
}
ArrowDataType::Struct(_) => Arc::new(GeometryVector::try_from_arrow_array(array)?),
_ => unimplemented!("Arrow array datatype: {:?}", array.as_ref().data_type()),
})
}

View File

@@ -2,6 +2,8 @@ mod dedup;
mod filter;
mod replicate;
use std::sync::Arc;
use arrow::bitmap::MutableBitmap;
use crate::error::Result;
@@ -119,3 +121,18 @@ where
filter::filter_non_constant!(self, PrimitiveVector<T>, filter)
}
}
impl VectorOp for GeometryVector {
fn replicate(&self, _offsets: &[usize]) -> VectorRef {
todo!()
}
fn dedup(&self, _selected: &mut MutableBitmap, _prev_vector: Option<&dyn Vector>) {
todo!()
}
fn filter(&self, _filter: &BooleanVector) -> Result<VectorRef> {
let array = self.to_arrow_array();
let v = GeometryVector::try_from_arrow_array(array)?;
Ok(Arc::new(v))
}
}

View File

@@ -46,4 +46,3 @@ streaming-stats = "0.2"
test-util = { path = "../../test-util" }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"

View File

@@ -882,6 +882,7 @@ pub fn pyobj_try_to_typed_val(
None
}
}
ConcreteDataType::Geometry(_) => todo!(),
}
} else if is_instance::<PyNone>(&obj, vm) {
// if Untyped then by default return types with highest precision
@@ -940,6 +941,7 @@ pub fn val_to_pyobj(val: value::Value, vm: &VirtualMachine) -> PyObjectRef {
// FIXME(dennis): lose the timestamp unit here
Value::Timestamp(v) => vm.ctx.new_int(v.value()).into(),
value::Value::List(_) => unreachable!(),
Value::Geometry(_) => todo!(),
}
}

View File

@@ -117,6 +117,9 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> {
),
})
}
Value::Geometry(v) => {
row_writer.write_col(v.to_wkt())?;
}
}
}
row_writer.end_row()?;
@@ -150,6 +153,7 @@ fn create_mysql_column(column_schema: &ColumnSchema) -> Result<Column> {
Ok(ColumnType::MYSQL_TYPE_VARCHAR)
}
ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME),
ConcreteDataType::Geometry(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
_ => error::InternalSnafu {
err_msg: format!(
"not implemented for column datatype {:?}",

View File

@@ -6,6 +6,7 @@ use common_recordbatch::{util, RecordBatch};
use common_time::timestamp::TimeUnit;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::SchemaRef;
use datatypes::types::GeometryType;
use pgwire::api::portal::Portal;
use pgwire::api::query::{ExtendedQueryHandler, SimpleQueryHandler};
use pgwire::api::results::{FieldInfo, Response, Tag, TextQueryResponseBuilder};
@@ -120,6 +121,7 @@ fn encode_value(value: &Value, builder: &mut TextQueryResponseBuilder) -> PgWire
&value
),
}))),
Value::Geometry(v) => builder.append_field(Some(v.to_wkt())),
}
}
@@ -142,6 +144,9 @@ fn type_translate(origin: &ConcreteDataType) -> Result<Type> {
err_msg: format!("not implemented for column datatype {:?}", origin),
}
.fail(),
ConcreteDataType::Geometry(geom_type) => match geom_type {
GeometryType::Point => Ok(Type::POINT),
},
}
}
@@ -160,7 +165,7 @@ mod test {
use std::sync::Arc;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::value::ListValue;
use datatypes::value::{GeometryValue, ListValue};
use pgwire::api::results::FieldInfo;
use pgwire::api::Type;
@@ -189,6 +194,11 @@ mod test {
true,
),
ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
ColumnSchema::new(
"loc",
ConcreteDataType::geometry_datatype(GeometryType::Point),
true,
),
];
let pg_field_info = vec![
FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN),
@@ -207,6 +217,7 @@ mod test {
FieldInfo::new("strings".into(), None, None, Type::VARCHAR),
FieldInfo::new("timestamps".into(), None, None, Type::TIMESTAMP),
FieldInfo::new("dates".into(), None, None, Type::DATE),
FieldInfo::new("loc".into(), None, None, Type::POINT),
];
let schema = Arc::new(Schema::new(column_schemas));
let fs = schema_to_pg(schema).unwrap();
@@ -241,6 +252,7 @@ mod test {
FieldInfo::new("dates".into(), None, None, Type::DATE),
FieldInfo::new("datetimes".into(), None, None, Type::TIMESTAMP),
FieldInfo::new("timestamps".into(), None, None, Type::TIMESTAMP),
FieldInfo::new("loc".into(), None, None, Type::POINT),
];
let values = vec![
@@ -269,6 +281,7 @@ mod test {
Value::Date(1001i32.into()),
Value::DateTime(1000001i64.into()),
Value::Timestamp(1000001i64.into()),
Value::Geometry(GeometryValue::new_point(1f64, 2f64)),
];
let mut builder = TextQueryResponseBuilder::new(schema);
for i in values {

View File

@@ -9,5 +9,5 @@ common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.15.0"
sqlparser = { git = "https://github.com/sunng87/sqlparser-rs.git", branch = "feature/argument-for-custom-type-for-v015" }
table-engine = { path = "../table-engine" }

View File

@@ -10,8 +10,8 @@ use std::str::FromStr;
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::DateTimeType;
use datatypes::value::Value;
use datatypes::types::{DateTimeType, GeometryType};
use datatypes::value::{GeometryValue, Value};
use snafu::ensure;
use crate::ast::{
@@ -93,6 +93,16 @@ fn parse_string_to_value(
.fail()
}
}
ConcreteDataType::Geometry(_) => {
if let Ok(geo_value) = GeometryValue::from_wkt(&s) {
Ok(Value::Geometry(geo_value))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {} to Geometry value", s),
}
.fail()
}
}
_ => {
unreachable!()
}
@@ -242,17 +252,27 @@ fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<Concre
SqlDataType::Double => Ok(ConcreteDataType::float64_datatype()),
SqlDataType::Boolean => Ok(ConcreteDataType::boolean_datatype()),
SqlDataType::Date => Ok(ConcreteDataType::date_datatype()),
SqlDataType::Custom(obj_name) => match &obj_name.0[..] {
SqlDataType::Custom(obj_name, modifiers) => match &obj_name.0[..] {
[type_name] => {
if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) {
Ok(ConcreteDataType::datetime_datatype())
} else {
return Ok(ConcreteDataType::datetime_datatype());
} else if type_name
.value
.eq_ignore_ascii_case(GeometryType::GEOMETRY_TYPE_NAME)
{
if let Some(subtype) = modifiers.get(0) {
let subtype = subtype.to_uppercase();
if subtype == GeometryType::GEOMETRY_SUBTYPE_POINT_NAME {
return Ok(ConcreteDataType::Geometry(GeometryType::Point));
}
}
}
error::SqlTypeNotSupportedSnafu {
t: data_type.clone(),
}
.fail()
}
}
_ => error::SqlTypeNotSupportedSnafu {
t: data_type.clone(),
}
@@ -307,13 +327,28 @@ mod tests {
check_type(SqlDataType::Boolean, ConcreteDataType::boolean_datatype());
check_type(SqlDataType::Date, ConcreteDataType::date_datatype());
check_type(
SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")])),
SqlDataType::Custom(ObjectName(vec![Ident::new("datetime")]), vec![]),
ConcreteDataType::datetime_datatype(),
);
check_type(
SqlDataType::Timestamp,
ConcreteDataType::timestamp_millis_datatype(),
);
check_type(
SqlDataType::Custom(ObjectName(vec!["GEOMETRY".into()]), vec!["POINT".into()]),
ConcreteDataType::geometry_datatype(GeometryType::Point),
);
assert!(sql_data_type_to_concrete_data_type(&SqlDataType::Custom(
ObjectName(vec!["GEOMETRY".into()]),
vec![]
))
.is_err());
assert!(sql_data_type_to_concrete_data_type(&SqlDataType::Custom(
ObjectName(vec!["GEOMETRY".into()]),
vec!["DUMMY_TYPE".into()]
))
.is_err());
}
#[test]