feat: implement simple sql demo

This commit is contained in:
liangxingjian
2022-10-18 11:54:38 +08:00
parent 2f159dbe22
commit f4d8c2cef6
13 changed files with 141 additions and 12 deletions

14
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,14 @@
{
// 使用 IntelliSense 了解相关属性。
// 悬停以查看现有属性的描述。
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "attach",
"name": "Attach",
"pid": "${command:pickMyProcess}" // use ${command:pickProcess} to pick other users' processes
},
]
}

13
Cargo.lock generated
View File

@@ -1540,6 +1540,7 @@ dependencies = [
"serde",
"serde_json",
"snafu",
"wkt",
]
[[package]]
@@ -6502,6 +6503,18 @@ dependencies = [
"winapi",
]
[[package]]
name = "wkt"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c2252781f8927974e8ba6a67c965a759a2b88ea2b1825f6862426bbb1c8f41"
dependencies = [
"geo-types",
"log",
"num-traits",
"thiserror",
]
[[package]]
name = "wyz"
version = "0.5.0"

View File

@@ -13,6 +13,8 @@ common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
enum_dispatch = "0.3"
geo = { version = "0.23.0", features = ["use-serde"] }
geojson = "*"
num = "0.4"
num-traits = "0.2"
ordered-float = { version = "3.0", features = ["serde"] }
@@ -20,8 +22,7 @@ paste = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
geo = { version = "0.23.0", features = ["use-serde"] }
geojson = "*"
wkt = "0.10.3"
[dependencies.arrow]
package = "arrow2"

View File

@@ -1,6 +1,8 @@
use std::process::id;
use arrow::array::{
self, Array, BinaryArray as ArrowBinaryArray, MutableBinaryArray as ArrowMutableBinaryArray,
MutableUtf8Array, PrimitiveArray, Utf8Array,
MutableUtf8Array, PrimitiveArray, StructArray, Utf8Array,
};
use arrow::datatypes::DataType as ArrowDataType;
use common_time::timestamp::Timestamp;
@@ -9,6 +11,8 @@ use snafu::OptionExt;
use crate::error::{ConversionSnafu, Result};
use crate::prelude::ConcreteDataType;
use crate::value::Value;
use crate::vectors::all::GeometryVector;
use crate::vectors::Vector;
pub type BinaryArray = ArrowBinaryArray<i64>;
pub type MutableBinaryArray = ArrowMutableBinaryArray<i64>;
@@ -69,6 +73,10 @@ pub fn arrow_array_get(array: &dyn Array, idx: usize) -> Result<Value> {
};
Value::Timestamp(Timestamp::new(value, unit))
}
ArrowDataType::Struct(_) => {
let k = cast_array!(array, StructArray).values().get(0).unwrap();
Value::Geometry(crate::value::GeometryValue::new_point(0.into(), 0.into()))
}
// TODO(sunng87): List
_ => unimplemented!("Arrow array datatype: {:?}", array.data_type()),
};

View File

@@ -65,6 +65,7 @@ impl ConcreteDataType {
| ConcreteDataType::Date(_)
| ConcreteDataType::DateTime(_)
| ConcreteDataType::Timestamp(_)
| ConcreteDataType::Geometry(_)
)
}
@@ -144,6 +145,7 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
ArrowDataType::List(field) => Self::List(ListType::new(
ConcreteDataType::from_arrow_type(&field.data_type),
)),
ArrowDataType::Struct(_) => ConcreteDataType::geometry_datatype(),
_ => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: dt.clone(),

View File

@@ -1,10 +1,14 @@
use arrow::datatypes::Field;
use serde::{Deserialize, Serialize};
use crate::arrow::datatypes::DataType::Float64;
use crate::data_type::DataType;
use crate::prelude::{DataTypeRef, LogicalTypeId, Value};
use crate::value::GeometryValue;
use crate::vectors::geometry::GeometryVectorBuilder;
const GEOMETRY_TYPE_NAME: &str = "Geometry";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum GeometryType {
Point,
@@ -12,7 +16,7 @@ pub enum GeometryType {
impl DataType for GeometryType {
fn name(&self) -> &str {
"Geometry"
GEOMETRY_TYPE_NAME
}
fn logical_type_id(&self) -> crate::type_id::LogicalTypeId {
@@ -26,7 +30,11 @@ impl DataType for GeometryType {
}
fn as_arrow_type(&self) -> arrow::datatypes::DataType {
unreachable!()
let fields = vec![
Field::new("x", Float64, true),
Field::new("y", Float64, true),
];
arrow::datatypes::DataType::Struct(fields)
}
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn crate::vectors::MutableVector> {
@@ -37,3 +45,9 @@ impl DataType for GeometryType {
}
}
}
impl GeometryType {
pub fn name() -> &'static str {
GEOMETRY_TYPE_NAME
}
}

View File

@@ -1,4 +1,5 @@
use std::cmp::Ordering;
use std::str::FromStr;
use common_base::bytes::{Bytes, StringBytes};
use common_time::date::Date;
@@ -9,6 +10,10 @@ use geojson::de::deserialize_geometry;
use geojson::ser::serialize_geometry;
pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use wkt::Geometry;
use wkt::ToWkt;
use wkt::TryFromWkt;
use wkt::Wkt;
use crate::error::{self, Result};
use crate::prelude::*;
@@ -329,6 +334,29 @@ impl GeometryValue {
pub fn to_value(self) -> Value {
Value::Geometry(self)
}
pub fn as_ref(&self) -> GeometryValueRef {
GeometryValueRef::Ref { val: self }
}
pub fn from_str(s: &str) -> Result<Self> {
let wktls: Wkt<OrderedF64> = Wkt::from_str(s).unwrap();
match wktls.item {
Geometry::Point(_) => {
let p = Point::try_from_wkt_str(s).unwrap();
Ok(Self::Point(p))
}
Geometry::LineString(_) => todo!(),
Geometry::Polygon(_) => todo!(),
_ => unimplemented!(),
}
}
pub fn to_wkb(&self) -> String {
match self {
GeometryValue::Point(p) => p.wkt_string(),
}
}
}
impl Default for GeometryValue {

View File

@@ -4,6 +4,7 @@ use common_time::date::Date;
use common_time::datetime::DateTime;
use common_time::timestamp::Timestamp;
use super::geometry::GeometryVectorBuilder;
use crate::data_type::ConcreteDataType;
use crate::error::{self, Result};
use crate::prelude::ValueRef;
@@ -41,6 +42,7 @@ pub enum VectorBuilder {
Date(DateVectorBuilder),
DateTime(DateTimeVectorBuilder),
Timestamp(TimestampVectorBuilder),
Geometry(GeometryVectorBuilder),
}
impl VectorBuilder {
@@ -99,6 +101,9 @@ impl VectorBuilder {
ConcreteDataType::Timestamp(_) => {
VectorBuilder::Timestamp(TimestampVectorBuilder::with_capacity(capacity))
}
ConcreteDataType::Geometry(_) => VectorBuilder::Geometry(
GeometryVectorBuilder::with_capacity_point_vector_builder(capacity),
),
_ => unimplemented!(),
}
}
@@ -122,6 +127,7 @@ impl VectorBuilder {
VectorBuilder::Date(b) => b.data_type(),
VectorBuilder::DateTime(b) => b.data_type(),
VectorBuilder::Timestamp(b) => b.data_type(),
VectorBuilder::Geometry(b) => b.data_type(),
}
}
@@ -153,6 +159,7 @@ impl VectorBuilder {
(VectorBuilder::Timestamp(b), Value::Int64(v)) => {
b.push(Some(Timestamp::from_millis(*v)))
}
(VectorBuilder::Geometry(b), Value::Geometry(v)) => b.push(Some(v.as_ref())),
_ => panic!(
"Value {:?} does not match builder type {:?}",
@@ -190,6 +197,7 @@ impl VectorBuilder {
VectorBuilder::Date(b) => b.push_value_ref(value),
VectorBuilder::DateTime(b) => b.push_value_ref(value),
VectorBuilder::Timestamp(b) => b.push_value_ref(value),
VectorBuilder::Geometry(b) => b.push_value_ref(value),
}
}
@@ -212,6 +220,7 @@ impl VectorBuilder {
VectorBuilder::Date(b) => b.push(None),
VectorBuilder::DateTime(b) => b.push(None),
VectorBuilder::Timestamp(b) => b.push(None),
VectorBuilder::Geometry(b) => b.push(None),
}
}
@@ -234,6 +243,7 @@ impl VectorBuilder {
VectorBuilder::Date(b) => Arc::new(b.finish()),
VectorBuilder::DateTime(b) => Arc::new(b.finish()),
VectorBuilder::Timestamp(b) => Arc::new(b.finish()),
VectorBuilder::Geometry(b) => Arc::new(b.finish()),
}
}
}

View File

@@ -1,6 +1,6 @@
use std::sync::Arc;
use arrow::array::{Array, MutableArray};
use arrow::array::{Array, MutableArray, StructArray};
use snafu::{ensure, OptionExt, ResultExt};
use self::point::{PointVector, PointVectorBuilder};
@@ -253,6 +253,25 @@ impl Serializable for GeometryVector {
}
}
impl GeometryVector {
pub fn try_from_arrow_array(
array: impl AsRef<dyn arrow::array::Array>,
) -> crate::error::Result<Self> {
let array = array
.as_ref()
.as_any()
.downcast_ref::<StructArray>()
.with_context(|| crate::error::ConversionSnafu {
from: std::format!("{:?}", array.as_ref().data_type()),
})?
.clone();
let pv = PointVector { array };
Ok(GeometryVector::PointVector(pv))
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -301,11 +320,9 @@ mod tests {
assert_eq!(vector.memory_size(), 32); //2 elements (f64,f64)=2*2*8
assert_eq!(
format!("{:?}",vector.serialize_to_json().unwrap()),
"[Object {\"Point\": Object {\"type\": String(\"Point\"), \"coordinates\": Array [Number(2.0), Number(1.0)]}}, Null]".to_string()
)
}
}

View File

@@ -6,6 +6,7 @@ use std::sync::Arc;
use arrow::array::Array;
use arrow::datatypes::DataType as ArrowDataType;
use datafusion_common::ScalarValue;
use geometry::GeometryVector;
use snafu::OptionExt;
use crate::error::{ConversionSnafu, Result, UnknownVectorSnafu};
@@ -186,6 +187,7 @@ impl Helper {
ArrowDataType::Timestamp(_, _) => {
Arc::new(TimestampVector::try_from_arrow_array(array)?)
}
ArrowDataType::Struct(_) => Arc::new(GeometryVector::try_from_arrow_array(array)?),
_ => unimplemented!("Arrow array datatype: {:?}", array.as_ref().data_type()),
})
}

View File

@@ -2,6 +2,8 @@ mod dedup;
mod filter;
mod replicate;
use std::sync::Arc;
use arrow::bitmap::MutableBitmap;
use crate::error::Result;
@@ -129,6 +131,8 @@ impl VectorOp for GeometryVector {
}
fn filter(&self, filter: &BooleanVector) -> Result<VectorRef> {
todo!()
let array = self.to_arrow_array();
let v = GeometryVector::try_from_arrow_array(array)?;
return Ok(Arc::new(v));
}
}

View File

@@ -117,7 +117,9 @@ impl<'a, W: io::Write> MysqlResultWriter<'a, W> {
),
})
}
Value::Geometry(_) => todo!(),
Value::Geometry(v) => {
row_writer.write_col(v.to_wkb())?;
}
}
}
row_writer.end_row()?;
@@ -151,6 +153,7 @@ fn create_mysql_column(column_schema: &ColumnSchema) -> Result<Column> {
Ok(ColumnType::MYSQL_TYPE_VARCHAR)
}
ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_DATETIME),
ConcreteDataType::Geometry(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
_ => error::InternalSnafu {
err_msg: format!(
"not implemented for column datatype {:?}",

View File

@@ -8,10 +8,11 @@ pub mod statement;
use std::str::FromStr;
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::DateTimeType;
use datatypes::value::Value;
use datatypes::types::{DateTimeType, GeometryType};
use datatypes::value::{GeometryValue, Value};
use snafu::ensure;
use crate::ast::{
@@ -93,6 +94,16 @@ fn parse_string_to_value(
.fail()
}
}
ConcreteDataType::Geometry(t) => {
if let Ok(geo_value) = GeometryValue::from_str(&s) {
Ok(Value::Geometry(geo_value))
} else {
ParseSqlValueSnafu {
msg: format!("Failed to parse {} to Geometry value", s),
}
.fail()
}
}
_ => {
unreachable!()
}
@@ -246,6 +257,8 @@ fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<Concre
[type_name] => {
if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) {
Ok(ConcreteDataType::datetime_datatype())
} else if type_name.value.eq_ignore_ascii_case(GeometryType::name()) {
Ok(ConcreteDataType::geometry_datatype())
} else {
error::SqlTypeNotSupportedSnafu {
t: data_type.clone(),