From f4d8c2cef66b1d0d305bb4bdcba64f7e7023eaf5 Mon Sep 17 00:00:00 2001 From: liangxingjian <965662709@qq.com> Date: Tue, 18 Oct 2022 11:54:38 +0800 Subject: [PATCH] feat: implement simple sql demo --- .vscode/launch.json | 14 +++++++++++++ Cargo.lock | 13 ++++++++++++ src/datatypes/Cargo.toml | 5 +++-- src/datatypes/src/arrow_array.rs | 10 ++++++++- src/datatypes/src/data_type.rs | 2 ++ src/datatypes/src/types/geometry.rs | 18 ++++++++++++++-- src/datatypes/src/value.rs | 28 +++++++++++++++++++++++++ src/datatypes/src/vectors/builder.rs | 10 +++++++++ src/datatypes/src/vectors/geometry.rs | 23 +++++++++++++++++--- src/datatypes/src/vectors/helper.rs | 2 ++ src/datatypes/src/vectors/operations.rs | 6 +++++- src/servers/src/mysql/writer.rs | 5 ++++- src/sql/src/statements.rs | 17 +++++++++++++-- 13 files changed, 141 insertions(+), 12 deletions(-) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000000..e678396f08 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "attach", + "name": "Attach", + "pid": "${command:pickMyProcess}" // use ${command:pickProcess} to pick other users' processes + }, + ] +} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 0dcb3f20af..3f795815c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1540,6 +1540,7 @@ dependencies = [ "serde", "serde_json", "snafu", + "wkt", ] [[package]] @@ -6502,6 +6503,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "wkt" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c2252781f8927974e8ba6a67c965a759a2b88ea2b1825f6862426bbb1c8f41" +dependencies = [ + "geo-types", + "log", + "num-traits", + "thiserror", +] + [[package]] name = "wyz" version = "0.5.0" diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 53940e8a47..d093e00e4f 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -13,6 +13,8 @@ common-error = { path = "../common/error" } common-time = { path = "../common/time" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } enum_dispatch = "0.3" +geo = { version = "0.23.0", features = ["use-serde"] } +geojson = "*" num = "0.4" num-traits = "0.2" ordered-float = { version = "3.0", features = ["serde"] } @@ -20,8 +22,7 @@ paste = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } -geo = { version = "0.23.0", features = ["use-serde"] } -geojson = "*" +wkt = "0.10.3" [dependencies.arrow] package = "arrow2" diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index 8107754ddc..9df1f07dae 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -1,6 +1,8 @@ +use std::process::id; + use arrow::array::{ self, Array, BinaryArray as ArrowBinaryArray, MutableBinaryArray as ArrowMutableBinaryArray, - MutableUtf8Array, PrimitiveArray, Utf8Array, + MutableUtf8Array, PrimitiveArray, StructArray, Utf8Array, }; use arrow::datatypes::DataType as ArrowDataType; use common_time::timestamp::Timestamp; @@ -9,6 +11,8 @@ use snafu::OptionExt; use crate::error::{ConversionSnafu, Result}; use crate::prelude::ConcreteDataType; use crate::value::Value; +use crate::vectors::all::GeometryVector; +use crate::vectors::Vector; pub type BinaryArray = ArrowBinaryArray; pub type MutableBinaryArray = ArrowMutableBinaryArray; @@ -69,6 +73,10 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result { }; Value::Timestamp(Timestamp::new(value, unit)) } + ArrowDataType::Struct(_) => { + let k = cast_array!(array, StructArray).values().get(0).unwrap(); + Value::Geometry(crate::value::GeometryValue::new_point(0.into(), 0.into())) + } // TODO(sunng87): List _ => unimplemented!("Arrow array datatype: {:?}", array.data_type()), }; diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index dfcaa497b8..a4f972ce58 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -65,6 +65,7 @@ impl ConcreteDataType { | ConcreteDataType::Date(_) | ConcreteDataType::DateTime(_) | ConcreteDataType::Timestamp(_) + | ConcreteDataType::Geometry(_) ) } @@ -144,6 +145,7 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { ArrowDataType::List(field) => Self::List(ListType::new( ConcreteDataType::from_arrow_type(&field.data_type), )), + ArrowDataType::Struct(_) => ConcreteDataType::geometry_datatype(), _ => { return error::UnsupportedArrowTypeSnafu { arrow_type: dt.clone(), diff --git a/src/datatypes/src/types/geometry.rs b/src/datatypes/src/types/geometry.rs index bcb41ce55d..7d3de31894 100644 --- a/src/datatypes/src/types/geometry.rs +++ b/src/datatypes/src/types/geometry.rs @@ -1,10 +1,14 @@ +use arrow::datatypes::Field; use serde::{Deserialize, Serialize}; +use crate::arrow::datatypes::DataType::Float64; use crate::data_type::DataType; use crate::prelude::{DataTypeRef, LogicalTypeId, Value}; use crate::value::GeometryValue; use crate::vectors::geometry::GeometryVectorBuilder; +const GEOMETRY_TYPE_NAME: &str = "Geometry"; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum GeometryType { Point, @@ -12,7 +16,7 @@ pub enum GeometryType { impl DataType for GeometryType { fn name(&self) -> &str { - "Geometry" + GEOMETRY_TYPE_NAME } fn logical_type_id(&self) -> crate::type_id::LogicalTypeId { @@ -26,7 +30,11 @@ impl DataType for GeometryType { } fn as_arrow_type(&self) -> arrow::datatypes::DataType { - unreachable!() + let fields = vec![ + Field::new("x", Float64, true), + Field::new("y", Float64, true), + ]; + arrow::datatypes::DataType::Struct(fields) } fn create_mutable_vector(&self, capacity: usize) -> Box { @@ -37,3 +45,9 @@ impl DataType for GeometryType { } } } + +impl GeometryType { + pub fn name() -> &'static str { + GEOMETRY_TYPE_NAME + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 3d5a2fedcf..7014c49e3f 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -1,4 +1,5 @@ use std::cmp::Ordering; +use std::str::FromStr; use common_base::bytes::{Bytes, StringBytes}; use common_time::date::Date; @@ -9,6 +10,10 @@ use geojson::de::deserialize_geometry; use geojson::ser::serialize_geometry; pub use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; +use wkt::Geometry; +use wkt::ToWkt; +use wkt::TryFromWkt; +use wkt::Wkt; use crate::error::{self, Result}; use crate::prelude::*; @@ -329,6 +334,29 @@ impl GeometryValue { pub fn to_value(self) -> Value { Value::Geometry(self) } + + pub fn as_ref(&self) -> GeometryValueRef { + GeometryValueRef::Ref { val: self } + } + + pub fn from_str(s: &str) -> Result { + let wktls: Wkt = Wkt::from_str(s).unwrap(); + match wktls.item { + Geometry::Point(_) => { + let p = Point::try_from_wkt_str(s).unwrap(); + Ok(Self::Point(p)) + } + Geometry::LineString(_) => todo!(), + Geometry::Polygon(_) => todo!(), + _ => unimplemented!(), + } + } + + pub fn to_wkb(&self) -> String { + match self { + GeometryValue::Point(p) => p.wkt_string(), + } + } } impl Default for GeometryValue { diff --git a/src/datatypes/src/vectors/builder.rs b/src/datatypes/src/vectors/builder.rs index 5e97203b00..8da74283d8 100644 --- a/src/datatypes/src/vectors/builder.rs +++ b/src/datatypes/src/vectors/builder.rs @@ -4,6 +4,7 @@ use common_time::date::Date; use common_time::datetime::DateTime; use common_time::timestamp::Timestamp; +use super::geometry::GeometryVectorBuilder; use crate::data_type::ConcreteDataType; use crate::error::{self, Result}; use crate::prelude::ValueRef; @@ -41,6 +42,7 @@ pub enum VectorBuilder { Date(DateVectorBuilder), DateTime(DateTimeVectorBuilder), Timestamp(TimestampVectorBuilder), + Geometry(GeometryVectorBuilder), } impl VectorBuilder { @@ -99,6 +101,9 @@ impl VectorBuilder { ConcreteDataType::Timestamp(_) => { VectorBuilder::Timestamp(TimestampVectorBuilder::with_capacity(capacity)) } + ConcreteDataType::Geometry(_) => VectorBuilder::Geometry( + GeometryVectorBuilder::with_capacity_point_vector_builder(capacity), + ), _ => unimplemented!(), } } @@ -122,6 +127,7 @@ impl VectorBuilder { VectorBuilder::Date(b) => b.data_type(), VectorBuilder::DateTime(b) => b.data_type(), VectorBuilder::Timestamp(b) => b.data_type(), + VectorBuilder::Geometry(b) => b.data_type(), } } @@ -153,6 +159,7 @@ impl VectorBuilder { (VectorBuilder::Timestamp(b), Value::Int64(v)) => { b.push(Some(Timestamp::from_millis(*v))) } + (VectorBuilder::Geometry(b), Value::Geometry(v)) => b.push(Some(v.as_ref())), _ => panic!( "Value {:?} does not match builder type {:?}", @@ -190,6 +197,7 @@ impl VectorBuilder { VectorBuilder::Date(b) => b.push_value_ref(value), VectorBuilder::DateTime(b) => b.push_value_ref(value), VectorBuilder::Timestamp(b) => b.push_value_ref(value), + VectorBuilder::Geometry(b) => b.push_value_ref(value), } } @@ -212,6 +220,7 @@ impl VectorBuilder { VectorBuilder::Date(b) => b.push(None), VectorBuilder::DateTime(b) => b.push(None), VectorBuilder::Timestamp(b) => b.push(None), + VectorBuilder::Geometry(b) => b.push(None), } } @@ -234,6 +243,7 @@ impl VectorBuilder { VectorBuilder::Date(b) => Arc::new(b.finish()), VectorBuilder::DateTime(b) => Arc::new(b.finish()), VectorBuilder::Timestamp(b) => Arc::new(b.finish()), + VectorBuilder::Geometry(b) => Arc::new(b.finish()), } } } diff --git a/src/datatypes/src/vectors/geometry.rs b/src/datatypes/src/vectors/geometry.rs index 70281dace3..990d42fb7e 100644 --- a/src/datatypes/src/vectors/geometry.rs +++ b/src/datatypes/src/vectors/geometry.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use arrow::array::{Array, MutableArray}; +use arrow::array::{Array, MutableArray, StructArray}; use snafu::{ensure, OptionExt, ResultExt}; use self::point::{PointVector, PointVectorBuilder}; @@ -253,6 +253,25 @@ impl Serializable for GeometryVector { } } +impl GeometryVector { + pub fn try_from_arrow_array( + array: impl AsRef, + ) -> crate::error::Result { + let array = array + .as_ref() + .as_any() + .downcast_ref::() + .with_context(|| crate::error::ConversionSnafu { + from: std::format!("{:?}", array.as_ref().data_type()), + })? + .clone(); + + let pv = PointVector { array }; + + Ok(GeometryVector::PointVector(pv)) + } +} + #[cfg(test)] mod tests { use super::*; @@ -301,11 +320,9 @@ mod tests { assert_eq!(vector.memory_size(), 32); //2 elements (f64,f64)=2*2*8 - assert_eq!( format!("{:?}",vector.serialize_to_json().unwrap()), "[Object {\"Point\": Object {\"type\": String(\"Point\"), \"coordinates\": Array [Number(2.0), Number(1.0)]}}, Null]".to_string() ) - } } diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 98a2bf042d..bb4b915b81 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use arrow::array::Array; use arrow::datatypes::DataType as ArrowDataType; use datafusion_common::ScalarValue; +use geometry::GeometryVector; use snafu::OptionExt; use crate::error::{ConversionSnafu, Result, UnknownVectorSnafu}; @@ -186,6 +187,7 @@ impl Helper { ArrowDataType::Timestamp(_, _) => { Arc::new(TimestampVector::try_from_arrow_array(array)?) } + ArrowDataType::Struct(_) => Arc::new(GeometryVector::try_from_arrow_array(array)?), _ => unimplemented!("Arrow array datatype: {:?}", array.as_ref().data_type()), }) } diff --git a/src/datatypes/src/vectors/operations.rs b/src/datatypes/src/vectors/operations.rs index a9dabbb177..055aa6a5cd 100644 --- a/src/datatypes/src/vectors/operations.rs +++ b/src/datatypes/src/vectors/operations.rs @@ -2,6 +2,8 @@ mod dedup; mod filter; mod replicate; +use std::sync::Arc; + use arrow::bitmap::MutableBitmap; use crate::error::Result; @@ -129,6 +131,8 @@ impl VectorOp for GeometryVector { } fn filter(&self, filter: &BooleanVector) -> Result { - todo!() + let array = self.to_arrow_array(); + let v = GeometryVector::try_from_arrow_array(array)?; + return Ok(Arc::new(v)); } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index d524441f4b..713fd6843f 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -117,7 +117,9 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> { ), }) } - Value::Geometry(_) => todo!(), + Value::Geometry(v) => { + row_writer.write_col(v.to_wkb())?; + } } } row_writer.end_row()?; @@ -151,6 +153,7 @@ fn create_mysql_column(column_schema: &ColumnSchema) -> Result { Ok(ColumnType::MYSQL_TYPE_VARCHAR) } ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME), + ConcreteDataType::Geometry(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR), _ => error::InternalSnafu { err_msg: format!( "not implemented for column datatype {:?}", diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 4921bf1bd0..fcff941172 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -8,10 +8,11 @@ pub mod statement; use std::str::FromStr; use common_time::Timestamp; +use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; -use datatypes::types::DateTimeType; -use datatypes::value::Value; +use datatypes::types::{DateTimeType, GeometryType}; +use datatypes::value::{GeometryValue, Value}; use snafu::ensure; use crate::ast::{ @@ -93,6 +94,16 @@ fn parse_string_to_value( .fail() } } + ConcreteDataType::Geometry(t) => { + if let Ok(geo_value) = GeometryValue::from_str(&s) { + Ok(Value::Geometry(geo_value)) + } else { + ParseSqlValueSnafu { + msg: format!("Failed to parse {} to Geometry value", s), + } + .fail() + } + } _ => { unreachable!() } @@ -246,6 +257,8 @@ fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) { Ok(ConcreteDataType::datetime_datatype()) + } else if type_name.value.eq_ignore_ascii_case(GeometryType::name()) { + Ok(ConcreteDataType::geometry_datatype()) } else { error::SqlTypeNotSupportedSnafu { t: data_type.clone(),