diff --git a/Cargo.lock b/Cargo.lock index 7e320880d2..a544da4087 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1041,7 +1041,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "log", @@ -4583,7 +4583,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=67bb1d52bc1241972c368657e658592b1be7ead3#67bb1d52bc1241972c368657e658592b1be7ead3" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=75c5fb569183bb3d0fa1023df9c2214df722b9b1#75c5fb569183bb3d0fa1023df9c2214df722b9b1" dependencies = [ "prost 0.12.6", "serde", @@ -5079,7 +5079,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -6069,7 +6069,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -8810,7 +8810,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.12.1", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -8862,7 +8862,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.79", @@ -9024,7 +9024,7 @@ dependencies = [ "indoc", "libc", "memoffset 0.9.1", - "parking_lot 0.12.3", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -12359,6 +12359,7 @@ dependencies = [ "frontend", "futures", "futures-util", + "hex", "itertools 0.10.5", "meta-client", "meta-srv", @@ -13968,7 +13969,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6afb04634d..192815c187 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,11 +117,12 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" } derive_builder = "0.12" dotenv = "0.15" -etcd-client = { version = "0.13" } +etcd-client = "0.13" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "67bb1d52bc1241972c368657e658592b1be7ead3" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "75c5fb569183bb3d0fa1023df9c2214df722b9b1" } +hex = "0.4" humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" @@ -151,7 +152,7 @@ raft-engine = { version = "0.4.1", default-features = false } rand = "0.8" ratelimit = "0.9" regex = "1.8" -regex-automata = { version = "0.4" } +regex-automata = "0.4" reqwest = { version = "0.12", default-features = false, features = [ "json", "rustls-tls-native-roots", @@ -182,11 +183,11 @@ strum = { version = "0.25", features = ["derive"] } tempfile = "3" tokio = { version = "1.40", features = ["full"] } tokio-postgres = "0.7" -tokio-stream = { version = "0.1" } +tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["io-util", "compat"] } toml = "0.8.8" tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] } -tower = { version = "0.4" } +tower = "0.4" tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] } typetag = "0.2" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 9d8bbacff9..64baae1187 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -36,15 +36,14 @@ use datatypes::vectors::{ TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, VectorRef, }; -use greptime_proto::v1; use greptime_proto::v1::column_data_type_extension::TypeExt; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; use greptime_proto::v1::query_request::Query; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ - ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension, QueryRequest, - Row, SemanticType, + self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension, + QueryRequest, Row, SemanticType, VectorTypeExtension, }; use paste::paste; use snafu::prelude::*; @@ -150,6 +149,17 @@ impl From for ConcreteDataType { ConcreteDataType::decimal128_default_datatype() } } + ColumnDataType::Vector => { + if let Some(TypeExt::VectorType(d)) = datatype_wrapper + .datatype_ext + .as_ref() + .and_then(|datatype_ext| datatype_ext.type_ext.as_ref()) + { + ConcreteDataType::vector_datatype(d.dim) + } else { + ConcreteDataType::vector_default_datatype() + } + } } } } @@ -231,6 +241,15 @@ impl ColumnDataTypeWrapper { }), } } + + pub fn vector_datatype(dim: u32) -> Self { + ColumnDataTypeWrapper { + datatype: ColumnDataType::Vector, + datatype_ext: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim })), + }), + } + } } impl TryFrom for ColumnDataTypeWrapper { @@ -271,6 +290,7 @@ impl TryFrom for ColumnDataTypeWrapper { IntervalType::MonthDayNano(_) => ColumnDataType::IntervalMonthDayNano, }, ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128, + ConcreteDataType::Vector(_) => ColumnDataType::Vector, ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) @@ -299,6 +319,15 @@ impl TryFrom for ColumnDataTypeWrapper { None } } + ColumnDataType::Vector => { + datatype + .as_vector() + .map(|vector_type| ColumnDataTypeExtension { + type_ext: Some(TypeExt::VectorType(VectorTypeExtension { + dim: vector_type.dim as _, + })), + }) + } _ => None, }; Ok(Self { @@ -422,6 +451,10 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values string_values: Vec::with_capacity(capacity), ..Default::default() }, + ColumnDataType::Vector => Values { + binary_values: Vec::with_capacity(capacity), + ..Default::default() + }, } } @@ -673,6 +706,7 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> Decimal128::from_value_precision_scale(x.hi, x.lo, d.precision(), d.scale()).into() }), )), + ConcreteDataType::Vector(_) => Arc::new(BinaryVector::from_vec(values.binary_values)), ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) @@ -838,6 +872,7 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< )) }) .collect(), + ConcreteDataType::Vector(_) => values.binary_values.into_iter().map(|v| v.into()).collect(), ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) @@ -862,10 +897,7 @@ pub fn is_column_type_value_eq( ColumnDataTypeWrapper::try_new(type_value, type_extension) .map(|wrapper| { let datatype = ConcreteDataType::from(wrapper); - (datatype == *expect_type) - // Json type leverage binary type in pb, so this is valid. - || (datatype == ConcreteDataType::binary_datatype() - && *expect_type == ConcreteDataType::json_datatype()) + expect_type == &datatype }) .unwrap_or(false) } @@ -1152,6 +1184,10 @@ mod tests { let values = values_with_capacity(ColumnDataType::Decimal128, 2); let values = values.decimal128_values; assert_eq!(2, values.capacity()); + + let values = values_with_capacity(ColumnDataType::Vector, 2); + let values = values.binary_values; + assert_eq!(2, values.capacity()); } #[test] @@ -1239,7 +1275,11 @@ mod tests { assert_eq!( ConcreteDataType::decimal128_datatype(10, 2), ColumnDataTypeWrapper::decimal128_datatype(10, 2).into() - ) + ); + assert_eq!( + ConcreteDataType::vector_datatype(3), + ColumnDataTypeWrapper::vector_datatype(3).into() + ); } #[test] @@ -1335,6 +1375,10 @@ mod tests { .try_into() .unwrap() ); + assert_eq!( + ColumnDataTypeWrapper::vector_datatype(3), + ConcreteDataType::vector_datatype(3).try_into().unwrap() + ); let result: Result = ConcreteDataType::null_datatype().try_into(); assert!(result.is_err()); diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index 493893f49d..bf0dd918dd 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -218,6 +218,12 @@ pub fn values(arrays: &[VectorRef]) -> Result { Decimal128Vector, decimal128_values, |x| { convert_to_pb_decimal128(x) } + ), + ( + ConcreteDataType::Vector(_), + BinaryVector, + binary_values, + |x| { x.into() } ) ) } diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index d165a403f5..647cea839f 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -39,7 +39,7 @@ derive_builder.workspace = true etcd-client.workspace = true futures.workspace = true futures-util.workspace = true -hex = { version = "0.4" } +hex.workspace = true humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 12f5c8d1f2..a967ad5fc8 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -36,7 +36,7 @@ use crate::types::{ IntervalDayTimeType, IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonType, ListType, NullType, StringType, TimeMillisecondType, TimeType, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType, - UInt16Type, UInt32Type, UInt64Type, UInt8Type, + UInt16Type, UInt32Type, UInt64Type, UInt8Type, VectorType, }; use crate::value::Value; use crate::vectors::MutableVector; @@ -84,6 +84,9 @@ pub enum ConcreteDataType { // JSON type: Json(JsonType), + + // Vector type: + Vector(VectorType), } impl fmt::Display for ConcreteDataType { @@ -132,6 +135,7 @@ impl fmt::Display for ConcreteDataType { ConcreteDataType::List(v) => write!(f, "{}", v.name()), ConcreteDataType::Dictionary(v) => write!(f, "{}", v.name()), ConcreteDataType::Json(v) => write!(f, "{}", v.name()), + ConcreteDataType::Vector(v) => write!(f, "{}", v.name()), } } } @@ -167,6 +171,7 @@ impl ConcreteDataType { | ConcreteDataType::Decimal128(_) | ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) + | ConcreteDataType::Vector(_) ) } @@ -225,6 +230,10 @@ impl ConcreteDataType { matches!(self, ConcreteDataType::Json(_)) } + pub fn is_vector(&self) -> bool { + matches!(self, ConcreteDataType::Vector(_)) + } + pub fn numerics() -> Vec { vec![ ConcreteDataType::int8_datatype(), @@ -334,6 +343,13 @@ impl ConcreteDataType { } } + pub fn as_vector(&self) -> Option { + match self { + ConcreteDataType::Vector(v) => Some(*v), + _ => None, + } + } + /// Checks if the data type can cast to another data type. pub fn can_arrow_type_cast_to(&self, to_type: &ConcreteDataType) -> bool { let array = arrow_array::new_empty_array(&self.as_arrow_type()); @@ -564,6 +580,14 @@ impl ConcreteDataType { pub fn decimal128_default_datatype() -> ConcreteDataType { Self::decimal128_datatype(DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE) } + + pub fn vector_datatype(dim: u32) -> ConcreteDataType { + ConcreteDataType::Vector(VectorType::new(dim)) + } + + pub fn vector_default_datatype() -> ConcreteDataType { + Self::vector_datatype(0) + } } /// Data type abstraction. @@ -757,6 +781,7 @@ mod tests { assert!(ConcreteDataType::duration_microsecond_datatype().is_stringifiable()); assert!(ConcreteDataType::duration_nanosecond_datatype().is_stringifiable()); assert!(ConcreteDataType::decimal128_datatype(10, 2).is_stringifiable()); + assert!(ConcreteDataType::vector_default_datatype().is_stringifiable()); } #[test] @@ -909,5 +934,9 @@ mod tests { .to_string(), "Dictionary" ); + assert_eq!( + ConcreteDataType::vector_datatype(3).to_string(), + "Vector(3)" + ); } } diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 44e3def665..705e5d9682 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -196,6 +196,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid Vector: {}", msg))] + InvalidVector { + msg: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Value exceeds the precision {} bound", precision))] ValueExceedsPrecision { precision: u8, @@ -213,6 +220,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse extended type in metadata: {}", value))] + ParseExtendedType { + value: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Invalid fulltext option: {}", msg))] InvalidFulltextOption { msg: String, @@ -238,6 +251,7 @@ impl ErrorExt for Error { | InvalidTimestampPrecision { .. } | InvalidPrecisionOrScale { .. } | InvalidJson { .. } + | InvalidVector { .. } | InvalidFulltextOption { .. } => StatusCode::InvalidArguments, ValueExceedsPrecision { .. } @@ -253,7 +267,8 @@ impl ErrorExt for Error { | ProjectArrowSchema { .. } | ToScalarValue { .. } | TryFromValue { .. } - | ConvertArrowArrayToScalars { .. } => StatusCode::Internal, + | ConvertArrowArrayToScalars { .. } + | ParseExtendedType { .. } => StatusCode::Internal, } } diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 9aecfdf424..2eaa0254fb 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -21,11 +21,12 @@ use std::fmt; use std::sync::Arc; use arrow::datatypes::{Field, Schema as ArrowSchema}; +use column_schema::ColumnExtType; use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; -use crate::prelude::DataType; +use crate::prelude::ConcreteDataType; pub use crate::schema::column_schema::{ ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, @@ -263,9 +264,14 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result { } let mut field = Field::try_from(column_schema)?; - // Json column performs the same as binary column in Arrow, so we need to mark it - if column_schema.data_type.is_json() { - let metadata = HashMap::from([(TYPE_KEY.to_string(), column_schema.data_type.name())]); + // Column with type Json or Vector performs the same as binary column in Arrow, so we need to mark it + let extype = match column_schema.data_type { + ConcreteDataType::Json(_) => Some(ColumnExtType::Json), + ConcreteDataType::Vector(d) => Some(ColumnExtType::Vector(d.dim)), + _ => None, + }; + if let Some(extype) = extype { + let metadata = HashMap::from([(TYPE_KEY.to_string(), extype.to_string())]); field = field.with_metadata(metadata); } fields.push(field); diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 17a6d086c5..c1e2df8469 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fmt; +use std::str::FromStr; use arrow::datatypes::Field; use serde::{Deserialize, Serialize}; @@ -21,10 +22,9 @@ use snafu::{ensure, ResultExt}; use sqlparser_derive::{Visit, VisitMut}; use crate::data_type::{ConcreteDataType, DataType}; -use crate::error::{self, Error, InvalidFulltextOptionSnafu, Result}; +use crate::error::{self, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result}; use crate::schema::constraint::ColumnDefaultConstraint; use crate::schema::TYPE_KEY; -use crate::types::JSON_TYPE_NAME; use crate::value::Value; use crate::vectors::VectorRef; @@ -300,17 +300,57 @@ impl ColumnSchema { } } +/// Column extended type set in column schema's metadata. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ColumnExtType { + /// Json type. + Json, + + /// Vector type with dimension. + Vector(u32), +} + +impl fmt::Display for ColumnExtType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + ColumnExtType::Json => write!(f, "Json"), + ColumnExtType::Vector(dim) => write!(f, "Vector({})", dim), + } + } +} + +impl FromStr for ColumnExtType { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + match s { + "Json" => Ok(ColumnExtType::Json), + _ if s.starts_with("Vector(") && s.ends_with(')') => s[7..s.len() - 1] + .parse::() + .map(ColumnExtType::Vector) + .map_err(|_| "Invalid dimension for Vector".to_string()), + _ => Err("Unknown variant".to_string()), + } + } +} + impl TryFrom<&Field> for ColumnSchema { type Error = Error; fn try_from(field: &Field) -> Result { let mut data_type = ConcreteDataType::try_from(field.data_type())?; // Override the data type if it is specified in the metadata. - if field.metadata().contains_key(TYPE_KEY) { - data_type = match field.metadata().get(TYPE_KEY).unwrap().as_str() { - JSON_TYPE_NAME => ConcreteDataType::json_datatype(), - _ => data_type, - }; + if let Some(s) = field.metadata().get(TYPE_KEY) { + let extype = ColumnExtType::from_str(s) + .map_err(|_| ParseExtendedTypeSnafu { value: s }.build())?; + match extype { + ColumnExtType::Json => { + data_type = ConcreteDataType::json_datatype(); + } + ColumnExtType::Vector(dim) => { + data_type = ConcreteDataType::vector_datatype(dim); + } + } } let mut metadata = field.metadata().clone(); let default_constraint = match metadata.remove(DEFAULT_CONSTRAINT_KEY) { @@ -661,5 +701,24 @@ mod tests { column_schema.metadata.get(TYPE_KEY).unwrap(), &ConcreteDataType::json_datatype().name() ); + + let field = Field::new("test", ArrowDataType::Binary, true); + let field = field.with_metadata(Metadata::from([( + TYPE_KEY.to_string(), + ConcreteDataType::vector_datatype(3).name(), + )])); + let column_schema = ColumnSchema::try_from(&field).unwrap(); + assert_eq!("test", column_schema.name); + assert_eq!( + ConcreteDataType::vector_datatype(3), + column_schema.data_type + ); + assert!(column_schema.is_nullable); + assert!(!column_schema.is_time_index); + assert!(column_schema.default_constraint.is_none()); + assert_eq!( + column_schema.metadata.get(TYPE_KEY).unwrap(), + &ConcreteDataType::vector_datatype(3).name() + ); } } diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs index c236d09769..faf08a1bb8 100644 --- a/src/datatypes/src/schema/constraint.rs +++ b/src/datatypes/src/schema/constraint.rs @@ -94,7 +94,7 @@ impl ColumnDefaultConstraint { // Whether the value could be nullable has been checked before, only need // to check the type compatibility here. ensure!( - data_type.logical_type_id() == v.logical_type_id(), + value_type_match(data_type, v.data_type()), error::DefaultValueTypeSnafu { reason: format!( "column has type {:?} but default value has type {:?}", @@ -215,6 +215,17 @@ fn create_current_timestamp_vector( } } +fn value_type_match(column_type: &ConcreteDataType, value_type: ConcreteDataType) -> bool { + match (column_type, value_type) { + (ct, vt) if ct.logical_type_id() == vt.logical_type_id() => true, + // Vector and Json type is encoded as binary + (ConcreteDataType::Vector(_) | ConcreteDataType::Json(_), ConcreteDataType::Binary(_)) => { + true + } + _ => false, + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/datatypes/src/type_id.rs b/src/datatypes/src/type_id.rs index d7496a54e0..eafb0202d9 100644 --- a/src/datatypes/src/type_id.rs +++ b/src/datatypes/src/type_id.rs @@ -70,6 +70,8 @@ pub enum LogicalTypeId { Dictionary, Json, + + Vector, } impl LogicalTypeId { @@ -129,6 +131,7 @@ impl LogicalTypeId { LogicalTypeId::DurationNanosecond => ConcreteDataType::duration_nanosecond_datatype(), LogicalTypeId::Decimal128 => ConcreteDataType::decimal128_default_datatype(), LogicalTypeId::Json => ConcreteDataType::json_datatype(), + LogicalTypeId::Vector => ConcreteDataType::vector_default_datatype(), } } } diff --git a/src/datatypes/src/types.rs b/src/datatypes/src/types.rs index a0e6d501a6..4e991c1868 100644 --- a/src/datatypes/src/types.rs +++ b/src/datatypes/src/types.rs @@ -28,6 +28,7 @@ mod primitive_type; mod string_type; mod time_type; mod timestamp_type; +mod vector_type; pub use binary_type::BinaryType; pub use boolean_type::BooleanType; @@ -58,3 +59,4 @@ pub use timestamp_type::{ TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType, }; +pub use vector_type::{parse_string_to_vector_type_value, vector_type_value_to_string, VectorType}; diff --git a/src/datatypes/src/types/vector_type.rs b/src/datatypes/src/types/vector_type.rs new file mode 100644 index 0000000000..83ecbb049f --- /dev/null +++ b/src/datatypes/src/types/vector_type.rs @@ -0,0 +1,237 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::datatypes::DataType as ArrowDataType; +use common_base::bytes::Bytes; +use serde::{Deserialize, Serialize}; + +use crate::data_type::DataType; +use crate::error::{InvalidVectorSnafu, Result}; +use crate::scalars::ScalarVectorBuilder; +use crate::type_id::LogicalTypeId; +use crate::value::Value; +use crate::vectors::{BinaryVectorBuilder, MutableVector}; + +/// `VectorType` is a data type for vector data with a fixed dimension. +/// The type of items in the vector is float32. +/// It is stored as binary data that contains the concatenated float32 values. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct VectorType { + pub dim: u32, +} + +impl VectorType { + pub fn new(dim: u32) -> Self { + Self { dim } + } +} + +impl DataType for VectorType { + fn name(&self) -> String { + format!("Vector({})", self.dim) + } + + fn logical_type_id(&self) -> LogicalTypeId { + LogicalTypeId::Vector + } + + fn default_value(&self) -> Value { + Bytes::default().into() + } + + fn as_arrow_type(&self) -> ArrowDataType { + ArrowDataType::Binary + } + + fn create_mutable_vector(&self, capacity: usize) -> Box { + Box::new(BinaryVectorBuilder::with_capacity(capacity)) + } + + fn try_cast(&self, from: Value) -> Option { + match from { + Value::Binary(v) => Some(Value::Binary(v)), + _ => None, + } + } +} + +/// Converts a vector type value to string +/// for example: [1.0, 2.0, 3.0] -> "[1.0,2.0,3.0]" +pub fn vector_type_value_to_string(val: &[u8], dim: u32) -> Result { + let expected_len = dim as usize * std::mem::size_of::(); + if val.len() != expected_len { + return InvalidVectorSnafu { + msg: format!( + "Failed to convert Vector value to string: wrong byte size, expected {}, got {}", + expected_len, + val.len() + ), + } + .fail(); + } + + if dim == 0 { + return Ok("[]".to_string()); + } + + let elements = unsafe { + std::slice::from_raw_parts( + val.as_ptr() as *const f32, + val.len() / std::mem::size_of::(), + ) + }; + + let mut s = String::from("["); + for (i, e) in elements.iter().enumerate() { + if i > 0 { + s.push(','); + } + s.push_str(&e.to_string()); + } + s.push(']'); + Ok(s) +} + +/// Parses a string to a vector type value +/// Valid input format: "[1.0,2.0,3.0]", "[1.0, 2.0, 3.0]" +pub fn parse_string_to_vector_type_value(s: &str, dim: u32) -> Result> { + // Trim the brackets + let trimmed = s.trim(); + if !trimmed.starts_with('[') || !trimmed.ends_with(']') { + return InvalidVectorSnafu { + msg: format!("Failed to parse {s} to Vector value: not properly enclosed in brackets"), + } + .fail(); + } + // Remove the brackets + let content = trimmed[1..trimmed.len() - 1].trim(); + + if content.is_empty() { + if dim != 0 { + return InvalidVectorSnafu { + msg: format!("Failed to parse {s} to Vector value: wrong dimension"), + } + .fail(); + } + return Ok(vec![]); + } + + let elements = content + .split(',') + .map(|e| { + e.trim().parse::().map_err(|_| { + InvalidVectorSnafu { + msg: format!( + "Failed to parse {s} to Vector value: elements are not all float32" + ), + } + .build() + }) + }) + .collect::>>()?; + + // Check dimension + if elements.len() != dim as usize { + return InvalidVectorSnafu { + msg: format!("Failed to parse {s} to Vector value: wrong dimension"), + } + .fail(); + } + + // Convert Vec to Vec + let bytes = unsafe { + std::slice::from_raw_parts( + elements.as_ptr() as *const u8, + elements.len() * std::mem::size_of::(), + ) + .to_vec() + }; + + Ok(bytes) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_conversion_between_string_and_vector_type_value() { + let dim = 3; + + let cases = [ + ("[1.0,2.0,3]", "[1,2,3]"), + ("[0.0 , 0.0 , 0.0]", "[0,0,0]"), + ("[3.4028235e38, -3.4028235e38, 1.1754944e-38]", "[340282350000000000000000000000000000000,-340282350000000000000000000000000000000,0.000000000000000000000000000000000000011754944]"), + ]; + + for (s, expected) in cases.iter() { + let val = parse_string_to_vector_type_value(s, dim).unwrap(); + let s = vector_type_value_to_string(&val, dim).unwrap(); + assert_eq!(s, *expected); + } + + let dim = 0; + let cases = [("[]", "[]"), ("[ ]", "[]"), ("[ ]", "[]")]; + for (s, expected) in cases.iter() { + let val = parse_string_to_vector_type_value(s, dim).unwrap(); + let s = vector_type_value_to_string(&val, dim).unwrap(); + assert_eq!(s, *expected); + } + } + + #[test] + fn test_vector_type_value_to_string_wrong_byte_size() { + let dim = 3; + let val = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; + let res = vector_type_value_to_string(&val, dim); + assert!(res.is_err()); + + let dim = 0; + let val = vec![1]; + let res = vector_type_value_to_string(&val, dim); + assert!(res.is_err()); + } + + #[test] + fn test_parse_string_to_vector_type_value_not_properly_enclosed_in_brackets() { + let dim = 3; + let s = "1.0,2.0,3.0"; + let res = parse_string_to_vector_type_value(s, dim); + assert!(res.is_err()); + + let s = "[1.0,2.0,3.0"; + let res = parse_string_to_vector_type_value(s, dim); + assert!(res.is_err()); + + let s = "1.0,2.0,3.0]"; + let res = parse_string_to_vector_type_value(s, dim); + assert!(res.is_err()); + } + + #[test] + fn test_parse_string_to_vector_type_value_wrong_dimension() { + let dim = 3; + let s = "[1.0,2.0]"; + let res = parse_string_to_vector_type_value(s, dim); + assert!(res.is_err()); + } + + #[test] + fn test_parse_string_to_vector_type_value_elements_are_not_all_float32() { + let dim = 3; + let s = "[1.0,2.0,ah]"; + let res = parse_string_to_vector_type_value(s, dim); + assert!(res.is_err()); + } +} diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 6bc0c35ee7..d0f36139ca 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -520,7 +520,9 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result ScalarValue::UInt64(None), ConcreteDataType::Float32(_) => ScalarValue::Float32(None), ConcreteDataType::Float64(_) => ScalarValue::Float64(None), - ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ScalarValue::Binary(None), + ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => { + ScalarValue::Binary(None) + } ConcreteDataType::String(_) => ScalarValue::Utf8(None), ConcreteDataType::Date(_) => ScalarValue::Date32(None), ConcreteDataType::DateTime(_) => ScalarValue::Date64(None), diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 9bcbd11d93..c4e8349714 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -20,9 +20,10 @@ use snafu::ResultExt; use crate::arrow_array::{BinaryArray, MutableBinaryArray}; use crate::data_type::ConcreteDataType; -use crate::error::{self, Result}; +use crate::error::{self, InvalidVectorSnafu, Result}; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::serialize::Serializable; +use crate::types::parse_string_to_vector_type_value; use crate::value::{Value, ValueRef}; use crate::vectors::{self, MutableVector, Validity, Vector, VectorRef}; @@ -66,6 +67,40 @@ impl BinaryVector { } Ok(BinaryVector::from(vector)) } + + pub fn convert_binary_to_vector(&self, dim: u32) -> Result { + let arrow_array = self.to_arrow_array(); + let mut vector = vec![]; + for binary in arrow_array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + { + let v = if let Some(binary) = binary { + let bytes_size = dim as usize * std::mem::size_of::(); + if let Ok(s) = String::from_utf8(binary.to_vec()) { + let v = parse_string_to_vector_type_value(&s, dim)?; + Some(v) + } else if binary.len() == dim as usize * std::mem::size_of::() { + Some(binary.to_vec()) + } else { + return InvalidVectorSnafu { + msg: format!( + "Unexpected bytes size for vector value, expected {}, got {}", + bytes_size, + binary.len() + ), + } + .fail(); + } + } else { + None + }; + vector.push(v); + } + Ok(BinaryVector::from(vector)) + } } impl From for BinaryVector { @@ -473,4 +508,45 @@ mod tests { .unwrap_err(); assert_matches!(error, error::Error::InvalidJson { .. }); } + + #[test] + fn test_binary_vector_conversion() { + let dim = 3; + let vector = BinaryVector::from(vec![ + Some(b"[1,2,3]".to_vec()), + Some(b"[4,5,6]".to_vec()), + Some(b"[7,8,9]".to_vec()), + None, + ]); + let expected = BinaryVector::from(vec![ + Some( + [1.0f32, 2.0, 3.0] + .iter() + .flat_map(|v| v.to_le_bytes()) + .collect(), + ), + Some( + [4.0f32, 5.0, 6.0] + .iter() + .flat_map(|v| v.to_le_bytes()) + .collect(), + ), + Some( + [7.0f32, 8.0, 9.0] + .iter() + .flat_map(|v| v.to_le_bytes()) + .collect(), + ), + None, + ]); + + let converted = vector.convert_binary_to_vector(dim).unwrap(); + assert_eq!(converted.len(), expected.len()); + for i in 0..3 { + assert_eq!( + converted.get_ref(i).as_binary().unwrap().unwrap(), + expected.get_ref(i).as_binary().unwrap().unwrap() + ); + } + } } diff --git a/src/datatypes/src/vectors/eq.rs b/src/datatypes/src/vectors/eq.rs index 16b0adf6f6..ae923f58dc 100644 --- a/src/datatypes/src/vectors/eq.rs +++ b/src/datatypes/src/vectors/eq.rs @@ -80,7 +80,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool { match lhs.data_type() { Null(_) => true, Boolean(_) => is_vector_eq!(BooleanVector, lhs, rhs), - Binary(_) | Json(_) => is_vector_eq!(BinaryVector, lhs, rhs), + Binary(_) | Json(_) | Vector(_) => is_vector_eq!(BinaryVector, lhs, rhs), String(_) => is_vector_eq!(StringVector, lhs, rhs), Date(_) => is_vector_eq!(DateVector, lhs, rhs), DateTime(_) => is_vector_eq!(DateTimeVector, lhs, rhs), diff --git a/src/datatypes/src/vectors/operations.rs b/src/datatypes/src/vectors/operations.rs index caa0730f72..418dd2c14c 100644 --- a/src/datatypes/src/vectors/operations.rs +++ b/src/datatypes/src/vectors/operations.rs @@ -91,10 +91,17 @@ macro_rules! impl_scalar_vector_op { } fn cast(&self, to_type: &ConcreteDataType) -> Result { - if to_type == &ConcreteDataType::json_datatype() { - if let Some(vector) = self.as_any().downcast_ref::() { - let json_vector = vector.convert_binary_to_json()?; - return Ok(Arc::new(json_vector) as VectorRef); + if let Some(vector) = self.as_any().downcast_ref::() { + match to_type { + ConcreteDataType::Json(_) => { + let json_vector = vector.convert_binary_to_json()?; + return Ok(Arc::new(json_vector) as VectorRef); + } + ConcreteDataType::Vector(d) => { + let vector = vector.convert_binary_to_vector(d.dim)?; + return Ok(Arc::new(vector) as VectorRef); + } + _ => {} } } cast::cast_non_constant!(self, to_type) diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 1e4c6b8dc9..299a345578 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -369,16 +369,23 @@ pub(crate) fn validate_proto_value( column_schema: &ColumnSchema, ) -> Result<()> { if let Some(value_type) = proto_value_type(value) { + let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| { + InvalidRequestSnafu { + region_id, + reason: format!( + "column {} has unknown type {}", + column_schema.column_name, column_schema.datatype + ), + } + .build() + })?; ensure!( - value_type as i32 == column_schema.datatype, + proto_value_type_match(column_type, value_type), InvalidRequestSnafu { region_id, reason: format!( "value has type {:?}, but column {} has type {:?}({})", - value_type, - column_schema.column_name, - ColumnDataType::try_from(column_schema.datatype), - column_schema.datatype, + value_type, column_schema.column_name, column_type, column_schema.datatype, ), } ); @@ -387,6 +394,14 @@ pub(crate) fn validate_proto_value( Ok(()) } +fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool { + match (column_type, value_type) { + (ct, vt) if ct == vt => true, + (ColumnDataType::Vector, ColumnDataType::Binary) => true, + _ => false, + } +} + /// Oneshot output result sender. #[derive(Debug)] pub struct OutputTx(Sender>); diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 38e6b68a65..d1545148fc 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -69,7 +69,9 @@ impl SortField { ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, ConcreteDataType::Float32(_) => 5, ConcreteDataType::Float64(_) => 9, - ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => 11, + ConcreteDataType::Binary(_) + | ConcreteDataType::Json(_) + | ConcreteDataType::Vector(_) => 11, ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes. ConcreteDataType::Date(_) => 5, ConcreteDataType::DateTime(_) => 9, @@ -165,7 +167,8 @@ impl SortField { Time, time, Duration, duration, Decimal128, decimal128, - Json, binary + Json, binary, + Vector, binary ); Ok(()) @@ -188,7 +191,7 @@ impl SortField { Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?)) } )* - ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => Ok(Value::from( + ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from( Option::>::deserialize(deserializer) .context(error::DeserializeFieldSnafu)? .map(Bytes::from), @@ -273,7 +276,9 @@ impl SortField { ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9, ConcreteDataType::Float32(_) => 5, ConcreteDataType::Float64(_) => 9, - ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => { + ConcreteDataType::Binary(_) + | ConcreteDataType::Json(_) + | ConcreteDataType::Vector(_) => { // Now the encoder encode binary as a list of bytes so we can't use // skip bytes. let pos_before = deserializer.position(); @@ -606,6 +611,7 @@ mod tests { ConcreteDataType::interval_day_time_datatype(), ConcreteDataType::interval_month_day_nano_datatype(), ConcreteDataType::decimal128_default_datatype(), + ConcreteDataType::vector_datatype(3), ], vec![ Value::Boolean(true), @@ -630,6 +636,7 @@ mod tests { Value::IntervalDayTime(IntervalDayTime::new(1, 15)), Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)), Value::Decimal128(Decimal128::from(16)), + Value::Binary(Bytes::from(vec![0; 12])), ], ); } diff --git a/src/operator/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs index d619a602e8..2822f1dbec 100644 --- a/src/operator/src/req_convert/common.rs +++ b/src/operator/src/req_convert/common.rs @@ -25,6 +25,7 @@ use api::v1::{ RowDeleteRequest, RowInsertRequest, Rows, SemanticType, Value, }; use common_base::BitVec; +use datatypes::prelude::ConcreteDataType; use datatypes::vectors::VectorRef; use snafu::prelude::*; use snafu::ResultExt; @@ -53,6 +54,7 @@ fn encode_string_to_jsonb_binary(value_data: ValueData) -> Result { /// Prepares row insertion requests by converting any JSON values to binary JSONB format. pub fn preprocess_row_insert_requests(requests: &mut Vec) -> Result<()> { for request in requests { + validate_rows(&request.rows)?; prepare_rows(&mut request.rows)?; } @@ -62,6 +64,7 @@ pub fn preprocess_row_insert_requests(requests: &mut Vec) -> R /// Prepares row deletion requests by converting any JSON values to binary JSONB format. pub fn preprocess_row_delete_requests(requests: &mut Vec) -> Result<()> { for request in requests { + validate_rows(&request.rows)?; prepare_rows(&mut request.rows)?; } @@ -102,6 +105,58 @@ fn prepare_rows(rows: &mut Option) -> Result<()> { Ok(()) } +fn validate_rows(rows: &Option) -> Result<()> { + let Some(rows) = rows else { + return Ok(()); + }; + + for (col_idx, schema) in rows.schema.iter().enumerate() { + let column_type = + ColumnDataTypeWrapper::try_new(schema.datatype, schema.datatype_extension.clone()) + .context(ColumnDataTypeSnafu)? + .into(); + + let ConcreteDataType::Vector(d) = column_type else { + return Ok(()); + }; + + for row in &rows.rows { + let value = &row.values[col_idx].value_data; + if let Some(data) = value { + validate_vector_col(data, d.dim)?; + } + } + } + + Ok(()) +} + +fn validate_vector_col(data: &ValueData, dim: u32) -> Result<()> { + let data = match data { + ValueData::BinaryValue(data) => data, + _ => { + return InvalidInsertRequestSnafu { + reason: "Expecting binary data for vector column.".to_string(), + } + .fail(); + } + }; + + let expected_len = dim as usize * std::mem::size_of::(); + if data.len() != expected_len { + return InvalidInsertRequestSnafu { + reason: format!( + "Expecting {} bytes of data for vector column, but got {}.", + expected_len, + data.len() + ), + } + .fail(); + } + + Ok(()) +} + pub fn columns_to_rows(columns: Vec, row_count: u32) -> Result { let row_count = row_count as usize; let column_count = columns.len(); @@ -236,6 +291,7 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { interval_month_day_nano_values ), (Decimal128, Decimal128Value, decimal128_values), + (Vector, BinaryValue, binary_values), ); Ok(()) @@ -264,12 +320,7 @@ pub fn column_schema( ) -> Result> { columns .iter() - .map(|(column_name, vector)| { - let (datatype, datatype_extension) = - ColumnDataTypeWrapper::try_from(vector.data_type().clone()) - .context(ColumnDataTypeSnafu)? - .to_parts(); - + .map(|(column_name, _vector)| { let column_schema = table_info .meta .schema @@ -278,6 +329,11 @@ pub fn column_schema( msg: format!("unable to find column {column_name} in table schema"), })?; + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(column_schema.data_type.clone()) + .context(ColumnDataTypeSnafu)? + .to_parts(); + Ok(ColumnSchema { column_name: column_name.clone(), datatype: datatype as i32, @@ -322,7 +378,7 @@ fn semantic_type(table_info: &TableInfo, column: &str) -> Result { #[cfg(test)] mod tests { use api::v1::column::Values; - use api::v1::SemanticType; + use api::v1::{SemanticType, VectorTypeExtension}; use common_base::bit_vec::prelude::*; use super::*; @@ -356,30 +412,57 @@ mod tests { }), ..Default::default() }, + Column { + column_name: String::from("col3"), + datatype: ColumnDataType::Vector.into(), + semantic_type: SemanticType::Field.into(), + null_mask: vec![], + values: Some(Values { + binary_values: vec![vec![0; 4], vec![1; 4], vec![2; 4]], + ..Default::default() + }), + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 1 })), + }), + ..Default::default() + }, ]; let row_count = 3; let result = columns_to_rows(columns, row_count); let rows = result.unwrap(); - assert_eq!(rows.schema.len(), 2); + assert_eq!(rows.schema.len(), 3); assert_eq!(rows.schema[0].column_name, "col1"); assert_eq!(rows.schema[0].datatype, ColumnDataType::Int32 as i32); assert_eq!(rows.schema[0].semantic_type, SemanticType::Field as i32); assert_eq!(rows.schema[1].column_name, "col2"); assert_eq!(rows.schema[1].datatype, ColumnDataType::String as i32); assert_eq!(rows.schema[1].semantic_type, SemanticType::Tag as i32); + assert_eq!(rows.schema[2].column_name, "col3"); + assert_eq!(rows.schema[2].datatype, ColumnDataType::Vector as i32); + assert_eq!(rows.schema[2].semantic_type, SemanticType::Field as i32); + assert_eq!( + rows.schema[2].datatype_extension, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 1 })) + }) + ); assert_eq!(rows.rows.len(), 3); - assert_eq!(rows.rows[0].values.len(), 2); + assert_eq!(rows.rows[0].values.len(), 3); assert_eq!(rows.rows[0].values[0].value_data, None); assert_eq!( rows.rows[0].values[1].value_data, Some(ValueData::StringValue(String::from("value1"))) ); + assert_eq!( + rows.rows[0].values[2].value_data, + Some(ValueData::BinaryValue(vec![0; 4])) + ); - assert_eq!(rows.rows[1].values.len(), 2); + assert_eq!(rows.rows[1].values.len(), 3); assert_eq!( rows.rows[1].values[0].value_data, Some(ValueData::I32Value(42)) @@ -388,13 +471,21 @@ mod tests { rows.rows[1].values[1].value_data, Some(ValueData::StringValue(String::from("value2"))) ); + assert_eq!( + rows.rows[1].values[2].value_data, + Some(ValueData::BinaryValue(vec![1; 4])) + ); - assert_eq!(rows.rows[2].values.len(), 2); + assert_eq!(rows.rows[2].values.len(), 3); assert_eq!(rows.rows[2].values[0].value_data, None); assert_eq!( rows.rows[2].values[1].value_data, Some(ValueData::StringValue(String::from("value3"))) ); + assert_eq!( + rows.rows[2].values[2].value_data, + Some(ValueData::BinaryValue(vec![2; 4])) + ); // wrong type let columns = vec![Column { @@ -441,4 +532,37 @@ mod tests { let row_count = 3; assert!(columns_to_rows(columns, row_count).is_err()); } + + #[test] + fn test_validate_vector_row_success() { + let data = ValueData::BinaryValue(vec![0; 4]); + let dim = 1; + assert!(validate_vector_col(&data, dim).is_ok()); + + let data = ValueData::BinaryValue(vec![0; 8]); + let dim = 2; + assert!(validate_vector_col(&data, dim).is_ok()); + + let data = ValueData::BinaryValue(vec![0; 12]); + let dim = 3; + assert!(validate_vector_col(&data, dim).is_ok()); + } + + #[test] + fn test_validate_vector_row_fail_wrong_type() { + let data = ValueData::I32Value(42); + let dim = 1; + assert!(validate_vector_col(&data, dim).is_err()); + } + + #[test] + fn test_validate_vector_row_fail_wrong_length() { + let data = ValueData::BinaryValue(vec![0; 8]); + let dim = 1; + assert!(validate_vector_col(&data, dim).is_err()); + + let data = ValueData::BinaryValue(vec![0; 4]); + let dim = 2; + assert!(validate_vector_col(&data, dim).is_err()); + } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 1abfadeddb..4ac143be3d 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -562,6 +562,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Convert SQL value error"))] + ConvertSqlValue { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -674,6 +680,8 @@ impl ErrorExt for Error { ConvertScalarValue { source, .. } => source.status_code(), ToJson { .. } => StatusCode::Internal, + + ConvertSqlValue { source, .. } => source.status_code(), } } diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index 6c7cff7494..6e46a2b652 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -21,6 +21,7 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_telemetry::{debug, error}; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::SchemaRef; +use datatypes::types::vector_type_value_to_string; use futures::StreamExt; use opensrv_mysql::{ Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter, @@ -29,7 +30,7 @@ use session::context::QueryContextRef; use snafu::prelude::*; use tokio::io::AsyncWrite; -use crate::error::{self, Error, Result}; +use crate::error::{self, ConvertSqlValueSnafu, Error, Result}; use crate::metrics::*; /// Try to write multiple output to the writer if possible. @@ -168,7 +169,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { &mut row_writer, &record_batch, query_context.clone(), - &column_def, + &query_result.schema, ) .await? } @@ -192,10 +193,10 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { row_writer: &mut RowWriter<'_, W>, recordbatch: &RecordBatch, query_context: QueryContextRef, - column_def: &[Column], + schema: &SchemaRef, ) -> Result<()> { for row in recordbatch.rows() { - for (value, column) in row.into_iter().zip(column_def.iter()) { + for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) { match value { Value::Null => row_writer.write_col(None::)?, Value::Boolean(v) => row_writer.write_col(v as i8)?, @@ -210,10 +211,15 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { Value::Float32(v) => row_writer.write_col(v.0)?, Value::Float64(v) => row_writer.write_col(v.0)?, Value::String(v) => row_writer.write_col(v.as_utf8())?, - Value::Binary(v) => match column.coltype { - ColumnType::MYSQL_TYPE_JSON => { + Value::Binary(v) => match column.data_type { + ConcreteDataType::Json(_) => { row_writer.write_col(jsonb::to_string(&v))?; } + ConcreteDataType::Vector(d) => { + let s = vector_type_value_to_string(&v, d.dim) + .context(ConvertSqlValueSnafu)?; + row_writer.write_col(s)?; + } _ => { row_writer.write_col(v.deref())?; } @@ -295,6 +301,7 @@ pub(crate) fn create_mysql_column( ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME), ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL), ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON), + ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_STRING), _ => error::UnsupportedDataTypeSnafu { data_type, reason: "not implemented", diff --git a/src/servers/src/postgres/types.rs b/src/servers/src/postgres/types.rs index 0bd8964a82..a26d803b80 100644 --- a/src/servers/src/postgres/types.rs +++ b/src/servers/src/postgres/types.rs @@ -27,7 +27,7 @@ use datafusion_expr::LogicalPlan; use datatypes::arrow::datatypes::DataType as ArrowDataType; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::Schema; -use datatypes::types::{IntervalType, TimestampType}; +use datatypes::types::{vector_type_value_to_string, IntervalType, TimestampType}; use datatypes::value::ListValue; use pgwire::api::portal::{Format, Portal}; use pgwire::api::results::{DataRowEncoder, FieldInfo}; @@ -364,6 +364,24 @@ fn encode_array( .collect::>>>()?; builder.encode_field(&array) } + &ConcreteDataType::Vector(d) => { + let array = value_list + .items() + .iter() + .map(|v| match v { + Value::Null => Ok(None), + Value::Binary(v) => { + let s = vector_type_value_to_string(v, d.dim) + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + Ok(Some(s)) + } + _ => Err(PgWireError::ApiError(Box::new(Error::Internal { + err_msg: format!("Invalid list item type, find {v:?}, expected vector",), + }))), + }) + .collect::>>>()?; + builder.encode_field(&array) + } _ => Err(PgWireError::ApiError(Box::new(Error::Internal { err_msg: format!( "cannot write array type {:?} in postgres protocol: unimplemented", @@ -395,6 +413,11 @@ pub(super) fn encode_value( Value::String(v) => builder.encode_field(&v.as_utf8()), Value::Binary(v) => match datatype { ConcreteDataType::Json(_) => builder.encode_field(&jsonb::to_string(v)), + ConcreteDataType::Vector(d) => { + let s = vector_type_value_to_string(v, d.dim) + .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + builder.encode_field(&s) + } _ => { let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output(); match *bytea_output { @@ -499,6 +522,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY), &ConcreteDataType::Duration(_) | &ConcreteDataType::Dictionary(_) + | &ConcreteDataType::Vector(_) | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu { data_type: origin, reason: "not implemented", @@ -512,6 +536,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result { } .fail() } + &ConcreteDataType::Vector(_) => Ok(Type::FLOAT4_ARRAY), } } diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 245210055e..d05ccf8e54 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -325,6 +325,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Datatype error: {}", source))] + Datatype { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -363,6 +370,7 @@ impl ErrorExt for Error { SerializeColumnDefaultConstraint { source, .. } => source.status_code(), ConvertToGrpcDataType { source, .. } => source.status_code(), + Datatype { source, .. } => source.status_code(), ConvertToDfStatement { .. } => StatusCode::Internal, ConvertSqlValue { .. } | ConvertValue { .. } => StatusCode::Unsupported, diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 43b0d5b359..8bbc72a17f 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -39,7 +39,7 @@ use crate::parser::{ParserContext, FLOW}; use crate::parsers::utils::validate_column_fulltext_create_option; use crate::statements::create::{ Column, ColumnExtensions, CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, - CreateTableLike, CreateView, Partitions, TableConstraint, + CreateTableLike, CreateView, Partitions, TableConstraint, VECTOR_OPT_DIM, }; use crate::statements::statement::Statement; use crate::statements::{ @@ -668,6 +668,31 @@ impl<'a> ParserContext<'a> { column_type: &DataType, column_extensions: &mut ColumnExtensions, ) -> Result { + if let DataType::Custom(name, tokens) = column_type + && name.0.len() == 1 + && &name.0[0].value.to_uppercase() == "VECTOR" + { + ensure!( + tokens.len() == 1, + InvalidColumnOptionSnafu { + name: column_name.to_string(), + msg: "VECTOR type should have dimension", + } + ); + + let dimension = + tokens[0] + .parse::() + .ok() + .with_context(|| InvalidColumnOptionSnafu { + name: column_name.to_string(), + msg: "dimension should be a positive integer", + })?; + + let options = HashMap::from_iter([(VECTOR_OPT_DIM.to_string(), dimension.to_string())]); + column_extensions.vector_options = Some(options.into()); + } + if parser.parse_keyword(Keyword::FULLTEXT) { ensure!( column_extensions.fulltext_options.is_none(), diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index f3a60651a1..fd2cb3dee2 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -42,10 +42,10 @@ use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN}; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, COMMENT_KEY}; -use datatypes::types::{cast, TimestampType}; +use datatypes::types::{cast, parse_string_to_vector_type_value, TimestampType}; use datatypes::value::{OrderedF32, OrderedF64, Value}; use snafu::{ensure, OptionExt, ResultExt}; -use sqlparser::ast::{ExactNumberInfo, UnaryOperator}; +use sqlparser::ast::{ExactNumberInfo, Ident, ObjectName, UnaryOperator}; use crate::ast::{ ColumnDef, ColumnOption, ColumnOptionDef, DataType as SqlDataType, Expr, TimezoneInfo, @@ -53,7 +53,7 @@ use crate::ast::{ }; use crate::error::{ self, ColumnTypeMismatchSnafu, ConvertSqlValueSnafu, ConvertToGrpcDataTypeSnafu, - ConvertValueSnafu, InvalidCastSnafu, InvalidSqlValueSnafu, InvalidUnaryOpSnafu, + ConvertValueSnafu, DatatypeSnafu, InvalidCastSnafu, InvalidSqlValueSnafu, InvalidUnaryOpSnafu, ParseSqlValueSnafu, Result, SerializeColumnDefaultConstraintSnafu, SetFulltextOptionSnafu, TimestampOverflowSnafu, UnsupportedDefaultValueSnafu, UnsupportedUnaryOpSnafu, }; @@ -61,6 +61,8 @@ use crate::statements::create::Column; pub use crate::statements::option_map::OptionMap; pub use crate::statements::transform::{get_data_type_by_alias_name, transform_statements}; +const VECTOR_TYPE_NAME: &str = "VECTOR"; + fn parse_string_to_value( column_name: &str, s: String, @@ -134,6 +136,10 @@ fn parse_string_to_value( .fail() } } + ConcreteDataType::Vector(d) => { + let v = parse_string_to_vector_type_value(&s, d.dim).context(DatatypeSnafu)?; + Ok(Value::Binary(v.into())) + } _ => { unreachable!() } @@ -614,6 +620,20 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result Ok(ConcreteDataType::json_datatype()), + // Vector type + SqlDataType::Custom(name, d) + if name.0.as_slice().len() == 1 + && name.0.as_slice()[0].value.to_ascii_uppercase() == VECTOR_TYPE_NAME + && d.len() == 1 => + { + let dim = d[0].parse().map_err(|e| { + error::ParseSqlValueSnafu { + msg: format!("Failed to parse vector dimension: {}", e), + } + .build() + })?; + Ok(ConcreteDataType::vector_datatype(dim)) + } _ => error::SqlTypeNotSupportedSnafu { t: data_type.clone(), } @@ -651,6 +671,10 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu ExactNumberInfo::PrecisionAndScale(d.precision() as u64, d.scale() as u64), )), ConcreteDataType::Json(_) => Ok(SqlDataType::JSON), + ConcreteDataType::Vector(v) => Ok(SqlDataType::Custom( + ObjectName(vec![Ident::new(VECTOR_TYPE_NAME)]), + vec![v.dim.to_string()], + )), ConcreteDataType::Duration(_) | ConcreteDataType::Null(_) | ConcreteDataType::List(_) @@ -766,6 +790,14 @@ mod tests { SqlDataType::Interval, ConcreteDataType::interval_month_day_nano_datatype(), ); + check_type(SqlDataType::JSON, ConcreteDataType::json_datatype()); + check_type( + SqlDataType::Custom( + ObjectName(vec![Ident::new(VECTOR_TYPE_NAME)]), + vec!["3".to_string()], + ), + ConcreteDataType::vector_datatype(3), + ); } #[test] @@ -1489,6 +1521,7 @@ mod tests { ]) .into(), ), + vector_options: None, }, }; @@ -1501,7 +1534,7 @@ mod tests { } #[test] - pub fn test_parse_placeholder_value() { + fn test_parse_placeholder_value() { assert!(sql_value_to_value( "test", &ConcreteDataType::string_datatype(), diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index e376be67d0..20ed7b5559 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -30,6 +30,7 @@ use crate::statements::OptionMap; const LINE_SEP: &str = ",\n"; const COMMA_SEP: &str = ", "; const INDENT: usize = 2; +pub const VECTOR_OPT_DIM: &str = "dim"; macro_rules! format_indent { ($fmt: expr, $arg: expr) => { @@ -112,6 +113,8 @@ pub struct Column { pub struct ColumnExtensions { /// Fulltext options. pub fulltext_options: Option, + /// Vector options. + pub vector_options: Option, } impl Column { @@ -138,6 +141,13 @@ impl Column { impl Display for Column { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(vector_options) = &self.extensions.vector_options { + if let Some(dim) = vector_options.get(VECTOR_OPT_DIM) { + write!(f, "{} VECTOR({})", self.column_def.name, dim)?; + return Ok(()); + } + } + write!(f, "{}", self.column_def)?; if let Some(fulltext_options) = &self.extensions.fulltext_options { if !fulltext_options.is_empty() { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 38c3c01557..dba8688eea 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -83,6 +83,7 @@ zstd.workspace = true [dev-dependencies] datafusion.workspace = true datafusion-expr.workspace = true +hex.workspace = true itertools.workspace = true opentelemetry-proto.workspace = true partition.workspace = true diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 3acfa0d863..5e5e402b85 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -17,14 +17,16 @@ mod test { use std::collections::HashMap; use api::v1::column::Values; + use api::v1::column_data_type_extension::TypeExt; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::region::QueryRequest as RegionQueryRequest; use api::v1::{ - alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DeleteRequests, - DropTableExpr, InsertRequest, InsertRequests, QueryRequest, SemanticType, + alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, + ColumnDataTypeExtension, ColumnDef, CreateDatabaseExpr, CreateTableExpr, DdlRequest, + DeleteRequest, DeleteRequests, DropTableExpr, InsertRequest, InsertRequests, QueryRequest, + SemanticType, VectorTypeExtension, }; use client::OutputData; use common_catalog::consts::MITO_ENGINE; @@ -204,6 +206,7 @@ CREATE TABLE {table_name} ( a INT, b STRING, c JSON, + d VECTOR(3), ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b, c) @@ -352,6 +355,27 @@ CREATE TABLE {table_name} ( r#"{ "id": 15, "transactions": [{ "amount": 500, "date": "2024-01-01" }, { "amount": -200, "date": "2024-02-01" }] }"#.to_string(), r#"{ "id": 16, "transactions": [{ "amount": 500, "date": "2024-01-01" }] }"#.to_string(), ]; + let vector_values = [ + [1.0f32, 2.0, 3.0], + [4.0, 5.0, 6.0], + [7.0, 8.0, 9.0], + [10.0, 11.0, 12.0], + [13.0, 14.0, 15.0], + [16.0, 17.0, 18.0], + [19.0, 20.0, 21.0], + [22.0, 23.0, 24.0], + [25.0, 26.0, 27.0], + [28.0, 29.0, 30.0], + [31.0, 32.0, 33.0], + [34.0, 35.0, 36.0], + [37.0, 38.0, 39.0], + [40.0, 41.0, 42.0], + [43.0, 44.0, 45.0], + [46.0, 47.0, 48.0], + ] + .iter() + .map(|x| x.iter().flat_map(|&f| f.to_le_bytes()).collect::>()) + .collect::>(); let insert = InsertRequest { table_name: table_name.to_string(), @@ -390,6 +414,19 @@ CREATE TABLE {table_name} ( datatype: ColumnDataType::Json as i32, ..Default::default() }, + Column { + column_name: "d".to_string(), + values: Some(Values { + binary_values: vector_values.clone(), + ..Default::default() + }), + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Vector as i32, + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 3 })), + }), + ..Default::default() + }, Column { column_name: "ts".to_string(), values: Some(Values { @@ -414,7 +451,7 @@ CREATE TABLE {table_name} ( let request = Request::Query(QueryRequest { query: Some(Query::Sql(format!( - "SELECT ts, a, b, json_to_string(c) as c FROM {table_name} ORDER BY ts" + "SELECT ts, a, b, json_to_string(c) as c, d FROM {table_name} ORDER BY ts" ))), }); let output = query(instance, request.clone()).await; @@ -422,29 +459,53 @@ CREATE TABLE {table_name} ( unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = r#"+---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+ -| ts | a | b | c | -+---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+ -| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | {"active":true,"age":30,"id":1,"name":"Alice"} | -| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 | {"active":false,"balance":1234.56,"id":2,"name":"Bob"} | -| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | {"age":28,"id":3,"tags":["rust","testing","json"]} | -| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | {"id":4,"metadata":{"created_at":"2024-10-30T12:00:00Z","status":"inactive"}} | -| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | {"id":5,"name":null,"phone":"+1234567890"} | -| 2023-01-01T07:26:17 | | ts: 1672557977000 | {"active":true,"height":5.9,"id":6,"weight":72.5} | -| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | {"age":29,"id":7,"languages":["English","Spanish"]} | -| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 | {"contact":{"email":"hank@example.com","phone":"+0987654321"},"id":8} | -| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | {"id":9,"preferences":{"notifications":true,"theme":"dark"}} | -| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | {"active":false,"id":10,"scores":[88,92,76]} | -| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 | {"birthday":"1996-07-20","id":11,"location":{"city":"New York","zip":"10001"}} | -| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | {"id":12,"subscription":{"expires":"2025-01-01","type":"premium"}} | -| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | {"active":true,"id":13,"settings":{"brightness":0.6,"volume":0.8}} | -| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | {"id":14,"notes":["first note","second note"],"priority":1} | -| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 | {"id":15,"transactions":[{"amount":500,"date":"2024-01-01"},{"amount":-200,"date":"2024-02-01"}]} | -| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | {"id":16,"transactions":[{"amount":500,"date":"2024-01-01"}]} | -+---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+"#; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); + let expected = r#"+---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+--------------------------+ +| ts | a | b | c | d | ++---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+--------------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | {"active":true,"age":30,"id":1,"name":"Alice"} | 0000803f0000004000004040 | +| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 | {"active":false,"balance":1234.56,"id":2,"name":"Bob"} | 000080400000a0400000c040 | +| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | {"age":28,"id":3,"tags":["rust","testing","json"]} | 0000e0400000004100001041 | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | {"id":4,"metadata":{"created_at":"2024-10-30T12:00:00Z","status":"inactive"}} | 000020410000304100004041 | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | {"id":5,"name":null,"phone":"+1234567890"} | 000050410000604100007041 | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | {"active":true,"height":5.9,"id":6,"weight":72.5} | 000080410000884100009041 | +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | {"age":29,"id":7,"languages":["English","Spanish"]} | 000098410000a0410000a841 | +| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 | {"contact":{"email":"hank@example.com","phone":"+0987654321"},"id":8} | 0000b0410000b8410000c041 | +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | {"id":9,"preferences":{"notifications":true,"theme":"dark"}} | 0000c8410000d0410000d841 | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | {"active":false,"id":10,"scores":[88,92,76]} | 0000e0410000e8410000f041 | +| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 | {"birthday":"1996-07-20","id":11,"location":{"city":"New York","zip":"10001"}} | 0000f8410000004200000442 | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | {"id":12,"subscription":{"expires":"2025-01-01","type":"premium"}} | 0000084200000c4200001042 | +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | {"active":true,"id":13,"settings":{"brightness":0.6,"volume":0.8}} | 000014420000184200001c42 | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | {"id":14,"notes":["first note","second note"],"priority":1} | 000020420000244200002842 | +| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 | {"id":15,"transactions":[{"amount":500,"date":"2024-01-01"},{"amount":-200,"date":"2024-02-01"}]} | 00002c420000304200003442 | +| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | {"id":16,"transactions":[{"amount":500,"date":"2024-01-01"}]} | 0000384200003c4200004042 | ++---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+--------------------------+"#; + similar_asserts::assert_eq!(recordbatches.pretty_print().unwrap(), expected); - let new_grpc_delete_request = |a, b, c, ts, row_count| DeleteRequest { + // Checks if the encoded vector values are as expected. + let hex_repr_of_vector_values = vector_values.iter().map(hex::encode).collect::>(); + assert_eq!( + hex_repr_of_vector_values, + vec![ + "0000803f0000004000004040", + "000080400000a0400000c040", + "0000e0400000004100001041", + "000020410000304100004041", + "000050410000604100007041", + "000080410000884100009041", + "000098410000a0410000a841", + "0000b0410000b8410000c041", + "0000c8410000d0410000d841", + "0000e0410000e8410000f041", + "0000f8410000004200000442", + "0000084200000c4200001042", + "000014420000184200001c42", + "000020420000244200002842", + "00002c420000304200003442", + "0000384200003c4200004042", + ] + ); + + let new_grpc_delete_request = |a, b, c, d, ts, row_count| DeleteRequest { table_name: table_name.to_string(), key_columns: vec![ Column { @@ -477,6 +538,19 @@ CREATE TABLE {table_name} ( datatype: ColumnDataType::Json as i32, ..Default::default() }, + Column { + column_name: "d".to_string(), + values: Some(Values { + binary_values: d, + ..Default::default() + }), + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Vector as i32, + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: 3 })), + }), + ..Default::default() + }, Column { column_name: "ts".to_string(), semantic_type: SemanticType::Timestamp as i32, @@ -504,6 +578,12 @@ CREATE TABLE {table_name} ( r#"{ "id": 11, "birthday": "1996-07-20", "location": { "city": "New York", "zip": "10001" } }"#.to_string(), r#"{ "id": 15, "transactions": [{ "amount": 500, "date": "2024-01-01" }, { "amount": -200, "date": "2024-02-01" }] }"#.to_string(), ], + vec![ + [4.0f32, 5.0, 6.0].iter().flat_map(|f| f.to_le_bytes()).collect::>(), + [22.0f32, 23.0, 24.0].iter().flat_map(|f| f.to_le_bytes()).collect::>(), + [31.0f32, 32.0, 33.0].iter().flat_map(|f| f.to_le_bytes()).collect::>(), + [43.0f32, 44.0, 45.0].iter().flat_map(|f| f.to_le_bytes()).collect::>(), + ], vec![1672557973000, 1672557979000, 1672557982000, 1672557986000], 4, ); @@ -518,6 +598,16 @@ CREATE TABLE {table_name} ( r#"{ "id": 16, "transactions": [{ "amount": 500, "date": "2024-01-01" }] }"# .to_string(), ], + vec![ + [7.0f32, 8.0, 9.0] + .iter() + .flat_map(|f| f.to_le_bytes()) + .collect::>(), + [46.0f32, 47.0, 48.0] + .iter() + .flat_map(|f| f.to_le_bytes()) + .collect::>(), + ], vec![1672557974000, 1672557987000], 2, ); @@ -535,20 +625,20 @@ CREATE TABLE {table_name} ( unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = r#"+---------------------+----+-------------------+-------------------------------------------------------------------------------+ -| ts | a | b | c | -+---------------------+----+-------------------+-------------------------------------------------------------------------------+ -| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | {"active":true,"age":30,"id":1,"name":"Alice"} | -| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | {"id":4,"metadata":{"created_at":"2024-10-30T12:00:00Z","status":"inactive"}} | -| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | {"id":5,"name":null,"phone":"+1234567890"} | -| 2023-01-01T07:26:17 | | ts: 1672557977000 | {"active":true,"height":5.9,"id":6,"weight":72.5} | -| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | {"age":29,"id":7,"languages":["English","Spanish"]} | -| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | {"id":9,"preferences":{"notifications":true,"theme":"dark"}} | -| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | {"active":false,"id":10,"scores":[88,92,76]} | -| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | {"id":12,"subscription":{"expires":"2025-01-01","type":"premium"}} | -| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | {"active":true,"id":13,"settings":{"brightness":0.6,"volume":0.8}} | -| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | {"id":14,"notes":["first note","second note"],"priority":1} | -+---------------------+----+-------------------+-------------------------------------------------------------------------------+"#; + let expected = r#"+---------------------+----+-------------------+-------------------------------------------------------------------------------+--------------------------+ +| ts | a | b | c | d | ++---------------------+----+-------------------+-------------------------------------------------------------------------------+--------------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | {"active":true,"age":30,"id":1,"name":"Alice"} | 0000803f0000004000004040 | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | {"id":4,"metadata":{"created_at":"2024-10-30T12:00:00Z","status":"inactive"}} | 000020410000304100004041 | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | {"id":5,"name":null,"phone":"+1234567890"} | 000050410000604100007041 | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | {"active":true,"height":5.9,"id":6,"weight":72.5} | 000080410000884100009041 | +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | {"age":29,"id":7,"languages":["English","Spanish"]} | 000098410000a0410000a841 | +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | {"id":9,"preferences":{"notifications":true,"theme":"dark"}} | 0000c8410000d0410000d841 | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | {"active":false,"id":10,"scores":[88,92,76]} | 0000e0410000e8410000f041 | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | {"id":12,"subscription":{"expires":"2025-01-01","type":"premium"}} | 0000084200000c4200001042 | +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | {"active":true,"id":13,"settings":{"brightness":0.6,"volume":0.8}} | 000014420000184200001c42 | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | {"id":14,"notes":["first note","second note"],"priority":1} | 000020420000244200002842 | ++---------------------+----+-------------------+-------------------------------------------------------------------------------+--------------------------+"#; similar_asserts::assert_eq!(recordbatches.pretty_print().unwrap(), expected); } diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index 1a481e533d..4e16beb052 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -165,7 +165,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { .unwrap(); sqlx::query( - "create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null, j json default null)", + "create table demo(i bigint, ts timestamp time index default current_timestamp, d date default null, dt datetime default null, b blob default null, j json default null, v vector(3) default null)", ) .execute(&pool) .await @@ -178,7 +178,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { let d = NaiveDate::from_yo_opt(2015, 100).unwrap(); let hello = format!("hello{i}"); let bytes = hello.as_bytes(); - let jsons = serde_json::json!({ + let json = serde_json::json!({ "code": i, "success": true, "payload": { @@ -189,19 +189,21 @@ pub async fn test_mysql_crud(store_type: StorageType) { "homepage": null } }); - sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?)") + let vector = "[1,2,3]"; + sqlx::query("insert into demo values(?, ?, ?, ?, ?, ?, ?)") .bind(i) .bind(i) .bind(d) .bind(dt) .bind(bytes) - .bind(jsons) + .bind(json) + .bind(vector) .execute(&pool) .await .unwrap(); } - let rows = sqlx::query("select i, d, dt, b, j from demo") + let rows = sqlx::query("select i, d, dt, b, j, v from demo") .fetch_all(&pool) .await .unwrap(); @@ -213,6 +215,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { let dt: DateTime = row.get("dt"); let bytes: Vec = row.get("b"); let json: serde_json::Value = row.get("j"); + let vector: String = row.get("v"); assert_eq!(ret, i as i64); let expected_d = NaiveDate::from_yo_opt(2015, 100).unwrap(); assert_eq!(expected_d, d); @@ -239,6 +242,7 @@ pub async fn test_mysql_crud(store_type: StorageType) { } }); assert_eq!(json, expected_j); + assert_eq!(vector, "[1,2,3]"); } let rows = sqlx::query("select i from demo where i=?") diff --git a/tests/cases/standalone/common/types/vector/vector.result b/tests/cases/standalone/common/types/vector/vector.result new file mode 100644 index 0000000000..d9b5a2e61e --- /dev/null +++ b/tests/cases/standalone/common/types/vector/vector.result @@ -0,0 +1,92 @@ +CREATE TABLE t (ts TIMESTAMP TIME INDEX, v VECTOR(3)); + +Affected Rows: 0 + +INSERT INTO t VALUES +(1, "[1.0, 2.0, 3.0]"), +(2, "[4.0, 5.0, 6.0]"), +(3, "[7.0, 8.0, 9.0]"); + +Affected Rows: 3 + +-- SQLNESS PROTOCOL MYSQL +SELECT * FROM t; + ++----------------------------+---------+ +| ts | v | ++----------------------------+---------+ +| 1970-01-01 00:00:00.001000 | [1,2,3] | +| 1970-01-01 00:00:00.002000 | [4,5,6] | +| 1970-01-01 00:00:00.003000 | [7,8,9] | ++----------------------------+---------+ + +-- SQLNESS PROTOCOL POSTGRES +SELECT * FROM t; + ++----------------------------+-----------+ +| ts | v | ++----------------------------+-----------+ +| 1970-01-01 00:00:00.001000 | "[1,2,3]" | +| 1970-01-01 00:00:00.002000 | "[4,5,6]" | +| 1970-01-01 00:00:00.003000 | "[7,8,9]" | ++----------------------------+-----------+ + +-- Unexpected dimension -- +INSERT INTO t VALUES +(4, "[1.0]"); + +Error: 1004(InvalidArguments), Invalid Vector: Failed to parse [1.0] to Vector value: wrong dimension + +-- Invalid vector value -- +INSERT INTO t VALUES +(5, "1.0,2.0,3.0"); + +Error: 1004(InvalidArguments), Invalid Vector: Failed to parse 1.0,2.0,3.0 to Vector value: not properly enclosed in brackets + +-- Invalid vector value -- +INSERT INTO t VALUES +(6, "[30h, 40s, 50m]"); + +Error: 1004(InvalidArguments), Invalid Vector: Failed to parse [30h, 40s, 50m] to Vector value: elements are not all float32 + +CREATE TABLE t2 (ts TIMESTAMP TIME INDEX, v VECTOR(3) DEFAULT '[1.0, 2.0, 3.0]'); + +Affected Rows: 0 + +INSERT INTO t2 (ts) VALUES +(1), +(2), +(3); + +Affected Rows: 3 + +-- SQLNESS PROTOCOL MYSQL +SELECT * FROM t2; + ++----------------------------+---------+ +| ts | v | ++----------------------------+---------+ +| 1970-01-01 00:00:00.001000 | [1,2,3] | +| 1970-01-01 00:00:00.002000 | [1,2,3] | +| 1970-01-01 00:00:00.003000 | [1,2,3] | ++----------------------------+---------+ + +-- SQLNESS PROTOCOL POSTGRES +SELECT * FROM t2; + ++----------------------------+-----------+ +| ts | v | ++----------------------------+-----------+ +| 1970-01-01 00:00:00.001000 | "[1,2,3]" | +| 1970-01-01 00:00:00.002000 | "[1,2,3]" | +| 1970-01-01 00:00:00.003000 | "[1,2,3]" | ++----------------------------+-----------+ + +DROP TABLE t; + +Affected Rows: 0 + +DROP TABLE t2; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/types/vector/vector.sql b/tests/cases/standalone/common/types/vector/vector.sql new file mode 100644 index 0000000000..376f356aaa --- /dev/null +++ b/tests/cases/standalone/common/types/vector/vector.sql @@ -0,0 +1,41 @@ +CREATE TABLE t (ts TIMESTAMP TIME INDEX, v VECTOR(3)); + +INSERT INTO t VALUES +(1, "[1.0, 2.0, 3.0]"), +(2, "[4.0, 5.0, 6.0]"), +(3, "[7.0, 8.0, 9.0]"); + +-- SQLNESS PROTOCOL MYSQL +SELECT * FROM t; + +-- SQLNESS PROTOCOL POSTGRES +SELECT * FROM t; + +-- Unexpected dimension -- +INSERT INTO t VALUES +(4, "[1.0]"); + +-- Invalid vector value -- +INSERT INTO t VALUES +(5, "1.0,2.0,3.0"); + +-- Invalid vector value -- +INSERT INTO t VALUES +(6, "[30h, 40s, 50m]"); + +CREATE TABLE t2 (ts TIMESTAMP TIME INDEX, v VECTOR(3) DEFAULT '[1.0, 2.0, 3.0]'); + +INSERT INTO t2 (ts) VALUES +(1), +(2), +(3); + +-- SQLNESS PROTOCOL MYSQL +SELECT * FROM t2; + +-- SQLNESS PROTOCOL POSTGRES +SELECT * FROM t2; + +DROP TABLE t; + +DROP TABLE t2;