From 6cd7319d67c0bd76359fd83bf83ca4edac22fecd Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Wed, 16 Aug 2023 19:25:25 +0800 Subject: [PATCH] refactor: grpc insert (#2188) * feat: interval type for row protocol * feat: minor refactor grpc insert * Update src/common/grpc-expr/src/util.rs Co-authored-by: Ruihang Xia * fix: by comment --------- Co-authored-by: Ruihang Xia --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/api/Cargo.toml | 3 + src/api/src/helper.rs | 627 ++++++++++++++++-- src/client/src/database.rs | 97 +-- src/common/grpc-expr/src/error.rs | 9 +- src/common/grpc-expr/src/insert.rs | 809 +---------------------- src/common/grpc-expr/src/lib.rs | 3 +- src/common/grpc-expr/src/util.rs | 184 ++++++ src/datanode/src/instance/grpc.rs | 2 +- src/frontend/src/instance/distributed.rs | 2 +- src/frontend/src/instance/grpc.rs | 2 +- src/servers/tests/mod.rs | 2 +- 13 files changed, 796 insertions(+), 949 deletions(-) create mode 100644 src/common/grpc-expr/src/util.rs diff --git a/Cargo.lock b/Cargo.lock index c07ae5e3b8..c8a522ee35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,6 +212,7 @@ dependencies = [ "common-time", "datatypes", "greptime-proto", + "paste", "prost", "snafu", "tonic 0.9.2", @@ -4136,7 +4137,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/NiwakaDev/greptime-proto.git?rev=ec402b6500f908a0acfab6c889225cd4dc2228a4#ec402b6500f908a0acfab6c889225cd4dc2228a4" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fa79839bbb304e0e7097e1cc4bcac9a63b3e496a#fa79839bbb304e0e7097e1cc4bcac9a63b3e496a" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 7e5f5268d4..134caa88e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/NiwakaDev/greptime-proto.git", rev = "ec402b6500f908a0acfab6c889225cd4dc2228a4" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fa79839bbb304e0e7097e1cc4bcac9a63b3e496a" } itertools = "0.10" lazy_static = "1.4" once_cell = "1.18" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 048566f84a..05fb337857 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -16,3 +16,6 @@ tonic.workspace = true [build-dependencies] tonic-build = "0.9" + +[dev-dependencies] +paste = "1.0" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index c894a4c723..bae965c8e8 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -12,15 +12,27 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_base::BitVec; use common_time::interval::IntervalUnit; use common_time::time::Time; use common_time::timestamp::TimeUnit; use common_time::{Date, DateTime, Interval, Timestamp}; use datatypes::prelude::{ConcreteDataType, ValueRef}; -use datatypes::types::{IntervalType, TimeType, TimestampType}; +use datatypes::scalars::ScalarVector; +use datatypes::types::{ + Int16Type, Int8Type, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type, +}; use datatypes::value::{OrderedF32, OrderedF64, Value}; -use datatypes::vectors::VectorRef; +use datatypes::vectors::{ + BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, + Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, + IntervalYearMonthVector, PrimitiveVector, StringVector, TimeMicrosecondVector, + TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, + TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, + UInt64Vector, VectorRef, +}; use greptime_proto::v1; use greptime_proto::v1::ddl_request::Expr; use greptime_proto::v1::greptime_request::Request; @@ -300,9 +312,9 @@ pub fn request_type(request: &Request) -> &'static str { Request::Inserts(_) => "inserts", Request::Query(query_req) => query_request_type(query_req), Request::Ddl(ddl_req) => ddl_request_type(ddl_req), - Request::RowInserts(_) => "row_inserts", - Request::RowDelete(_) => "row_delete", Request::Deletes(_) => "deletes", + Request::RowInserts(_) => "row_inserts", + Request::RowDeletes(_) => "row_deletes", } } @@ -341,60 +353,265 @@ pub fn convert_i128_to_interval(v: i128) -> IntervalMonthDayNano { } } -pub fn pb_value_to_value_ref(value: &greptime_proto::v1::Value) -> ValueRef { +pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { let Some(value) = &value.value_data else { return ValueRef::Null; }; match value { - greptime_proto::v1::value::ValueData::I8Value(v) => ValueRef::Int8(*v as i8), - greptime_proto::v1::value::ValueData::I16Value(v) => ValueRef::Int16(*v as i16), - greptime_proto::v1::value::ValueData::I32Value(v) => ValueRef::Int32(*v), - greptime_proto::v1::value::ValueData::I64Value(v) => ValueRef::Int64(*v), - greptime_proto::v1::value::ValueData::U8Value(v) => ValueRef::UInt8(*v as u8), - greptime_proto::v1::value::ValueData::U16Value(v) => ValueRef::UInt16(*v as u16), - greptime_proto::v1::value::ValueData::U32Value(v) => ValueRef::UInt32(*v), - greptime_proto::v1::value::ValueData::U64Value(v) => ValueRef::UInt64(*v), - greptime_proto::v1::value::ValueData::F32Value(f) => { - ValueRef::Float32(OrderedF32::from(*f)) + ValueData::I8Value(v) => ValueRef::Int8(*v as i8), + ValueData::I16Value(v) => ValueRef::Int16(*v as i16), + ValueData::I32Value(v) => ValueRef::Int32(*v), + ValueData::I64Value(v) => ValueRef::Int64(*v), + ValueData::U8Value(v) => ValueRef::UInt8(*v as u8), + ValueData::U16Value(v) => ValueRef::UInt16(*v as u16), + ValueData::U32Value(v) => ValueRef::UInt32(*v), + ValueData::U64Value(v) => ValueRef::UInt64(*v), + ValueData::F32Value(f) => ValueRef::Float32(OrderedF32::from(*f)), + ValueData::F64Value(f) => ValueRef::Float64(OrderedF64::from(*f)), + ValueData::BoolValue(b) => ValueRef::Boolean(*b), + ValueData::BinaryValue(bytes) => ValueRef::Binary(bytes.as_slice()), + ValueData::StringValue(string) => ValueRef::String(string.as_str()), + ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)), + ValueData::DatetimeValue(d) => ValueRef::DateTime(DateTime::new(*d)), + ValueData::TsSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)), + ValueData::TsMillisecondValue(t) => ValueRef::Timestamp(Timestamp::new_millisecond(*t)), + ValueData::TsMicrosecondValue(t) => ValueRef::Timestamp(Timestamp::new_microsecond(*t)), + ValueData::TsNanosecondValue(t) => ValueRef::Timestamp(Timestamp::new_nanosecond(*t)), + ValueData::TimeSecondValue(t) => ValueRef::Time(Time::new_second(*t)), + ValueData::TimeMillisecondValue(t) => ValueRef::Time(Time::new_millisecond(*t)), + ValueData::TimeMicrosecondValue(t) => ValueRef::Time(Time::new_microsecond(*t)), + ValueData::TimeNanosecondValue(t) => ValueRef::Time(Time::new_nanosecond(*t)), + ValueData::IntervalYearMonthValues(v) => ValueRef::Interval(Interval::from_i32(*v)), + ValueData::IntervalDayTimeValues(v) => ValueRef::Interval(Interval::from_i64(*v)), + ValueData::IntervalMonthDayNanoValues(v) => { + let interval = Interval::from_month_day_nano(v.months, v.days, v.nanoseconds); + ValueRef::Interval(interval) } - greptime_proto::v1::value::ValueData::F64Value(f) => { - ValueRef::Float64(OrderedF64::from(*f)) + } +} + +pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> VectorRef { + match data_type { + ConcreteDataType::Boolean(_) => Arc::new(BooleanVector::from(values.bool_values)), + ConcreteDataType::Int8(_) => Arc::new(PrimitiveVector::::from_iter_values( + values.i8_values.into_iter().map(|x| x as i8), + )), + ConcreteDataType::Int16(_) => Arc::new(PrimitiveVector::::from_iter_values( + values.i16_values.into_iter().map(|x| x as i16), + )), + ConcreteDataType::Int32(_) => Arc::new(Int32Vector::from_vec(values.i32_values)), + ConcreteDataType::Int64(_) => Arc::new(Int64Vector::from_vec(values.i64_values)), + ConcreteDataType::UInt8(_) => Arc::new(PrimitiveVector::::from_iter_values( + values.u8_values.into_iter().map(|x| x as u8), + )), + ConcreteDataType::UInt16(_) => Arc::new(PrimitiveVector::::from_iter_values( + values.u16_values.into_iter().map(|x| x as u16), + )), + ConcreteDataType::UInt32(_) => Arc::new(UInt32Vector::from_vec(values.u32_values)), + ConcreteDataType::UInt64(_) => Arc::new(UInt64Vector::from_vec(values.u64_values)), + ConcreteDataType::Float32(_) => Arc::new(Float32Vector::from_vec(values.f32_values)), + ConcreteDataType::Float64(_) => Arc::new(Float64Vector::from_vec(values.f64_values)), + ConcreteDataType::Binary(_) => Arc::new(BinaryVector::from(values.binary_values)), + ConcreteDataType::String(_) => Arc::new(StringVector::from_vec(values.string_values)), + ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)), + ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)), + ConcreteDataType::Timestamp(unit) => match unit { + TimestampType::Second(_) => { + Arc::new(TimestampSecondVector::from_vec(values.ts_second_values)) + } + TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec( + values.ts_millisecond_values, + )), + TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec( + values.ts_microsecond_values, + )), + TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec( + values.ts_nanosecond_values, + )), + }, + ConcreteDataType::Time(unit) => match unit { + TimeType::Second(_) => Arc::new(TimeSecondVector::from_iter_values( + values.time_second_values.iter().map(|x| *x as i32), + )), + TimeType::Millisecond(_) => Arc::new(TimeMillisecondVector::from_iter_values( + values.time_millisecond_values.iter().map(|x| *x as i32), + )), + TimeType::Microsecond(_) => Arc::new(TimeMicrosecondVector::from_vec( + values.time_microsecond_values, + )), + TimeType::Nanosecond(_) => Arc::new(TimeNanosecondVector::from_vec( + values.time_nanosecond_values, + )), + }, + + ConcreteDataType::Interval(unit) => match unit { + IntervalType::YearMonth(_) => Arc::new(IntervalYearMonthVector::from_vec( + values.interval_year_month_values, + )), + IntervalType::DayTime(_) => Arc::new(IntervalDayTimeVector::from_vec( + values.interval_day_time_values, + )), + IntervalType::MonthDayNano(_) => { + Arc::new(IntervalMonthDayNanoVector::from_iter_values( + values.interval_month_day_nano_values.iter().map(|x| { + Interval::from_month_day_nano(x.months, x.days, x.nanoseconds).to_i128() + }), + )) + } + }, + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + unreachable!() } - greptime_proto::v1::value::ValueData::BoolValue(b) => ValueRef::Boolean(*b), - greptime_proto::v1::value::ValueData::BinaryValue(bytes) => { - ValueRef::Binary(bytes.as_slice()) - } - greptime_proto::v1::value::ValueData::StringValue(string) => { - ValueRef::String(string.as_str()) - } - greptime_proto::v1::value::ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)), - greptime_proto::v1::value::ValueData::DatetimeValue(d) => { - ValueRef::DateTime(DateTime::new(*d)) - } - greptime_proto::v1::value::ValueData::TsSecondValue(t) => { - ValueRef::Timestamp(Timestamp::new_second(*t)) - } - greptime_proto::v1::value::ValueData::TsMillisecondValue(t) => { - ValueRef::Timestamp(Timestamp::new_millisecond(*t)) - } - greptime_proto::v1::value::ValueData::TsMicrosecondValue(t) => { - ValueRef::Timestamp(Timestamp::new_microsecond(*t)) - } - greptime_proto::v1::value::ValueData::TsNanosecondValue(t) => { - ValueRef::Timestamp(Timestamp::new_nanosecond(*t)) - } - greptime_proto::v1::value::ValueData::TimeSecondValue(t) => { - ValueRef::Time(Time::new_second(*t)) - } - greptime_proto::v1::value::ValueData::TimeMillisecondValue(t) => { - ValueRef::Time(Time::new_millisecond(*t)) - } - greptime_proto::v1::value::ValueData::TimeMicrosecondValue(t) => { - ValueRef::Time(Time::new_microsecond(*t)) - } - greptime_proto::v1::value::ValueData::TimeNanosecondValue(t) => { - ValueRef::Time(Time::new_nanosecond(*t)) + } +} + +pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec { + // TODO(fys): use macros to optimize code + match data_type { + ConcreteDataType::Int64(_) => values + .i64_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Float64(_) => values + .f64_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::String(_) => values + .string_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Boolean(_) => values + .bool_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Int8(_) => values + .i8_values + .into_iter() + // Safety: Since i32 only stores i8 data here, so i32 as i8 is safe. + .map(|val| (val as i8).into()) + .collect(), + ConcreteDataType::Int16(_) => values + .i16_values + .into_iter() + // Safety: Since i32 only stores i16 data here, so i32 as i16 is safe. + .map(|val| (val as i16).into()) + .collect(), + ConcreteDataType::Int32(_) => values + .i32_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::UInt8(_) => values + .u8_values + .into_iter() + // Safety: Since i32 only stores u8 data here, so i32 as u8 is safe. + .map(|val| (val as u8).into()) + .collect(), + ConcreteDataType::UInt16(_) => values + .u16_values + .into_iter() + // Safety: Since i32 only stores u16 data here, so i32 as u16 is safe. + .map(|val| (val as u16).into()) + .collect(), + ConcreteDataType::UInt32(_) => values + .u32_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::UInt64(_) => values + .u64_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Float32(_) => values + .f32_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Binary(_) => values + .binary_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::DateTime(_) => values + .datetime_values + .into_iter() + .map(|v| Value::DateTime(v.into())) + .collect(), + ConcreteDataType::Date(_) => values + .date_values + .into_iter() + .map(|v| Value::Date(v.into())) + .collect(), + ConcreteDataType::Timestamp(TimestampType::Second(_)) => values + .ts_second_values + .into_iter() + .map(|v| Value::Timestamp(Timestamp::new_second(v))) + .collect(), + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => values + .ts_millisecond_values + .into_iter() + .map(|v| Value::Timestamp(Timestamp::new_millisecond(v))) + .collect(), + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => values + .ts_microsecond_values + .into_iter() + .map(|v| Value::Timestamp(Timestamp::new_microsecond(v))) + .collect(), + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => values + .ts_nanosecond_values + .into_iter() + .map(|v| Value::Timestamp(Timestamp::new_nanosecond(v))) + .collect(), + ConcreteDataType::Time(TimeType::Second(_)) => values + .time_second_values + .into_iter() + .map(|v| Value::Time(Time::new_second(v))) + .collect(), + ConcreteDataType::Time(TimeType::Millisecond(_)) => values + .time_millisecond_values + .into_iter() + .map(|v| Value::Time(Time::new_millisecond(v))) + .collect(), + ConcreteDataType::Time(TimeType::Microsecond(_)) => values + .time_microsecond_values + .into_iter() + .map(|v| Value::Time(Time::new_microsecond(v))) + .collect(), + ConcreteDataType::Time(TimeType::Nanosecond(_)) => values + .time_nanosecond_values + .into_iter() + .map(|v| Value::Time(Time::new_nanosecond(v))) + .collect(), + + ConcreteDataType::Interval(IntervalType::YearMonth(_)) => values + .interval_year_month_values + .into_iter() + .map(|v| Value::Interval(Interval::from_i32(v))) + .collect(), + ConcreteDataType::Interval(IntervalType::DayTime(_)) => values + .interval_day_time_values + .into_iter() + .map(|v| Value::Interval(Interval::from_i64(v))) + .collect(), + ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => values + .interval_month_day_nano_values + .into_iter() + .map(|v| { + Value::Interval(Interval::from_month_day_nano( + v.months, + v.days, + v.nanoseconds, + )) + }) + .collect(), + ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { + unreachable!() } } } @@ -478,19 +695,32 @@ pub fn to_proto_value(value: Value) -> Option { }, Value::Time(v) => match v.unit() { TimeUnit::Second => v1::Value { - value_data: Some(v1::value::ValueData::TimeSecondValue(v.value())), + value_data: Some(ValueData::TimeSecondValue(v.value())), }, TimeUnit::Millisecond => v1::Value { - value_data: Some(v1::value::ValueData::TimeMillisecondValue(v.value())), + value_data: Some(ValueData::TimeMillisecondValue(v.value())), }, TimeUnit::Microsecond => v1::Value { - value_data: Some(v1::value::ValueData::TimeMicrosecondValue(v.value())), + value_data: Some(ValueData::TimeMicrosecondValue(v.value())), }, TimeUnit::Nanosecond => v1::Value { - value_data: Some(v1::value::ValueData::TimeNanosecondValue(v.value())), + value_data: Some(ValueData::TimeNanosecondValue(v.value())), }, }, - Value::Interval(_) | Value::List(_) => return None, + Value::Interval(v) => match v.unit() { + IntervalUnit::YearMonth => v1::Value { + value_data: Some(ValueData::IntervalYearMonthValues(v.to_i32())), + }, + IntervalUnit::DayTime => v1::Value { + value_data: Some(ValueData::IntervalDayTimeValues(v.to_i64())), + }, + IntervalUnit::MonthDayNano => v1::Value { + value_data: Some(ValueData::IntervalMonthDayNanoValues( + convert_i128_to_interval(v.to_i128()), + )), + }, + }, + Value::List(_) => return None, }; Some(proto_value) @@ -500,8 +730,7 @@ pub fn to_proto_value(value: Value) -> Option { /// /// If value is null, returns `None`. pub fn proto_value_type(value: &v1::Value) -> Option { - let value_data = value.value_data.as_ref()?; - let value_type = match value_data { + let value_type = match value.value_data.as_ref()? { ValueData::I8Value(_) => ColumnDataType::Int8, ValueData::I16Value(_) => ColumnDataType::Int16, ValueData::I32Value(_) => ColumnDataType::Int32, @@ -525,6 +754,9 @@ pub fn proto_value_type(value: &v1::Value) -> Option { ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, ValueData::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond, + ValueData::IntervalYearMonthValues(_) => ColumnDataType::IntervalYearMonth, + ValueData::IntervalDayTimeValues(_) => ColumnDataType::IntervalDayTime, + ValueData::IntervalMonthDayNanoValues(_) => ColumnDataType::IntervalMonthDayNano, }; Some(value_type) } @@ -583,12 +815,17 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType mod tests { use std::sync::Arc; + use datatypes::types::{ + IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, TimeMillisecondType, + TimeSecondType, TimestampMillisecondType, TimestampSecondType, + }; use datatypes::vectors::{ BooleanVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, Vector, }; + use paste::paste; use super::*; @@ -1009,4 +1246,278 @@ mod tests { assert_eq!(interval.days, 0); assert_eq!(interval.nanoseconds, 3000); } + + #[test] + fn test_convert_timestamp_values() { + // second + let actual = pb_values_to_values( + &ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)), + Values { + ts_second_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Timestamp(Timestamp::new_second(1_i64)), + Value::Timestamp(Timestamp::new_second(2_i64)), + Value::Timestamp(Timestamp::new_second(3_i64)), + ]; + assert_eq!(expect, actual); + + // millisecond + let actual = pb_values_to_values( + &ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)), + Values { + ts_millisecond_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Timestamp(Timestamp::new_millisecond(1_i64)), + Value::Timestamp(Timestamp::new_millisecond(2_i64)), + Value::Timestamp(Timestamp::new_millisecond(3_i64)), + ]; + assert_eq!(expect, actual); + } + + #[test] + fn test_convert_time_values() { + // second + let actual = pb_values_to_values( + &ConcreteDataType::Time(TimeType::Second(TimeSecondType)), + Values { + time_second_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Time(Time::new_second(1_i64)), + Value::Time(Time::new_second(2_i64)), + Value::Time(Time::new_second(3_i64)), + ]; + assert_eq!(expect, actual); + + // millisecond + let actual = pb_values_to_values( + &ConcreteDataType::Time(TimeType::Millisecond(TimeMillisecondType)), + Values { + time_millisecond_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Time(Time::new_millisecond(1_i64)), + Value::Time(Time::new_millisecond(2_i64)), + Value::Time(Time::new_millisecond(3_i64)), + ]; + assert_eq!(expect, actual); + } + + #[test] + fn test_convert_interval_values() { + // year_month + let actual = pb_values_to_values( + &ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType)), + Values { + interval_year_month_values: vec![1_i32, 2_i32, 3_i32], + ..Default::default() + }, + ); + let expect = vec![ + Value::Interval(Interval::from_year_month(1_i32)), + Value::Interval(Interval::from_year_month(2_i32)), + Value::Interval(Interval::from_year_month(3_i32)), + ]; + assert_eq!(expect, actual); + + // day_time + let actual = pb_values_to_values( + &ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType)), + Values { + interval_day_time_values: vec![1_i64, 2_i64, 3_i64], + ..Default::default() + }, + ); + let expect = vec![ + Value::Interval(Interval::from_i64(1_i64)), + Value::Interval(Interval::from_i64(2_i64)), + Value::Interval(Interval::from_i64(3_i64)), + ]; + assert_eq!(expect, actual); + + // month_day_nano + let actual = pb_values_to_values( + &ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType)), + Values { + interval_month_day_nano_values: vec![ + IntervalMonthDayNano { + months: 1, + days: 2, + nanoseconds: 3, + }, + IntervalMonthDayNano { + months: 5, + days: 6, + nanoseconds: 7, + }, + IntervalMonthDayNano { + months: 9, + days: 10, + nanoseconds: 11, + }, + ], + ..Default::default() + }, + ); + let expect = vec![ + Value::Interval(Interval::from_month_day_nano(1, 2, 3)), + Value::Interval(Interval::from_month_day_nano(5, 6, 7)), + Value::Interval(Interval::from_month_day_nano(9, 10, 11)), + ]; + assert_eq!(expect, actual); + } + + macro_rules! test_convert_values { + ($grpc_data_type: ident, $values: expr, $concrete_data_type: ident, $expected_ret: expr) => { + paste! { + #[test] + fn []() { + let values = Values { + [<$grpc_data_type _values>]: $values, + ..Default::default() + }; + + let data_type = ConcreteDataType::[<$concrete_data_type _datatype>](); + let result = pb_values_to_values(&data_type, values); + + assert_eq!( + $expected_ret, + result + ); + } + } + }; + } + + test_convert_values!( + i8, + vec![1_i32, 2, 3], + int8, + vec![Value::Int8(1), Value::Int8(2), Value::Int8(3)] + ); + + test_convert_values!( + u8, + vec![1_u32, 2, 3], + uint8, + vec![Value::UInt8(1), Value::UInt8(2), Value::UInt8(3)] + ); + + test_convert_values!( + i16, + vec![1_i32, 2, 3], + int16, + vec![Value::Int16(1), Value::Int16(2), Value::Int16(3)] + ); + + test_convert_values!( + u16, + vec![1_u32, 2, 3], + uint16, + vec![Value::UInt16(1), Value::UInt16(2), Value::UInt16(3)] + ); + + test_convert_values!( + i32, + vec![1, 2, 3], + int32, + vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)] + ); + + test_convert_values!( + u32, + vec![1, 2, 3], + uint32, + vec![Value::UInt32(1), Value::UInt32(2), Value::UInt32(3)] + ); + + test_convert_values!( + i64, + vec![1, 2, 3], + int64, + vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)] + ); + + test_convert_values!( + u64, + vec![1, 2, 3], + uint64, + vec![Value::UInt64(1), Value::UInt64(2), Value::UInt64(3)] + ); + + test_convert_values!( + f32, + vec![1.0, 2.0, 3.0], + float32, + vec![ + Value::Float32(1.0.into()), + Value::Float32(2.0.into()), + Value::Float32(3.0.into()) + ] + ); + + test_convert_values!( + f64, + vec![1.0, 2.0, 3.0], + float64, + vec![ + Value::Float64(1.0.into()), + Value::Float64(2.0.into()), + Value::Float64(3.0.into()) + ] + ); + + test_convert_values!( + string, + vec!["1".to_string(), "2".to_string(), "3".to_string()], + string, + vec![ + Value::String("1".into()), + Value::String("2".into()), + Value::String("3".into()) + ] + ); + + test_convert_values!( + binary, + vec!["1".into(), "2".into(), "3".into()], + binary, + vec![ + Value::Binary(b"1".to_vec().into()), + Value::Binary(b"2".to_vec().into()), + Value::Binary(b"3".to_vec().into()) + ] + ); + + test_convert_values!( + date, + vec![1, 2, 3], + date, + vec![ + Value::Date(1.into()), + Value::Date(2.into()), + Value::Date(3.into()) + ] + ); + + test_convert_values!( + datetime, + vec![1.into(), 2.into(), 3.into()], + datetime, + vec![ + Value::DateTime(1.into()), + Value::DateTime(2.into()), + Value::DateTime(3.into()) + ] + ); } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index e0c7022282..5bd4fc79e1 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -342,106 +342,11 @@ pub struct FlightContext { #[cfg(test)] mod tests { - use std::sync::Arc; - - use api::helper::ColumnDataTypeWrapper; use api::v1::auth_header::AuthScheme; - use api::v1::{AuthHeader, Basic, Column}; - use common_grpc::select::{null_mask, values}; - use common_grpc_expr::column_to_vector; - use datatypes::prelude::{Vector, VectorRef}; - use datatypes::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, - Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, UInt16Vector, - UInt32Vector, UInt64Vector, UInt8Vector, - }; + use api::v1::{AuthHeader, Basic}; use crate::database::FlightContext; - #[test] - fn test_column_to_vector() { - let mut column = create_test_column(Arc::new(BooleanVector::from(vec![true]))); - column.datatype = -100; - let result = column_to_vector(&column, 1); - assert!(result.is_err()); - assert_eq!( - result.unwrap_err().to_string(), - "Column datatype error, source: Unknown proto column datatype: -100" - ); - - macro_rules! test_with_vector { - ($vector: expr) => { - let vector = Arc::new($vector); - let column = create_test_column(vector.clone()); - let result = column_to_vector(&column, vector.len() as u32).unwrap(); - assert_eq!(result, vector as VectorRef); - }; - } - - test_with_vector!(BooleanVector::from(vec![Some(true), None, Some(false)])); - test_with_vector!(Int8Vector::from(vec![Some(i8::MIN), None, Some(i8::MAX)])); - test_with_vector!(Int16Vector::from(vec![ - Some(i16::MIN), - None, - Some(i16::MAX) - ])); - test_with_vector!(Int32Vector::from(vec![ - Some(i32::MIN), - None, - Some(i32::MAX) - ])); - test_with_vector!(Int64Vector::from(vec![ - Some(i64::MIN), - None, - Some(i64::MAX) - ])); - test_with_vector!(UInt8Vector::from(vec![Some(u8::MIN), None, Some(u8::MAX)])); - test_with_vector!(UInt16Vector::from(vec![ - Some(u16::MIN), - None, - Some(u16::MAX) - ])); - test_with_vector!(UInt32Vector::from(vec![ - Some(u32::MIN), - None, - Some(u32::MAX) - ])); - test_with_vector!(UInt64Vector::from(vec![ - Some(u64::MIN), - None, - Some(u64::MAX) - ])); - test_with_vector!(Float32Vector::from(vec![ - Some(f32::MIN), - None, - Some(f32::MAX) - ])); - test_with_vector!(Float64Vector::from(vec![ - Some(f64::MIN), - None, - Some(f64::MAX) - ])); - test_with_vector!(BinaryVector::from(vec![ - Some(b"".to_vec()), - None, - Some(b"hello".to_vec()) - ])); - test_with_vector!(StringVector::from(vec![Some(""), None, Some("foo"),])); - test_with_vector!(DateVector::from(vec![Some(1), None, Some(3)])); - test_with_vector!(DateTimeVector::from(vec![Some(4), None, Some(6)])); - } - - fn create_test_column(vector: VectorRef) -> Column { - let wrapper: ColumnDataTypeWrapper = vector.data_type().try_into().unwrap(); - Column { - column_name: "test".to_string(), - semantic_type: 1, - values: Some(values(&[vector.clone()]).unwrap()), - null_mask: null_mask(&[vector.clone()], vector.len()), - datatype: wrapper.datatype() as i32, - } - } - #[test] fn test_flight_ctx() { let mut ctx = FlightContext::default(); diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index 37763fd972..c364b7e942 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -47,6 +47,9 @@ pub enum Error { location: Location, }, + #[snafu(display("Duplicated column name in gRPC requests, name: {}", name,))] + DuplicatedColumnName { name: String, location: Location }, + #[snafu(display("Missing timestamp column, msg: {}", msg))] MissingTimestampColumn { msg: String, location: Location }, @@ -101,9 +104,9 @@ impl ErrorExt for Error { Error::IllegalDeleteRequest { .. } => StatusCode::InvalidArguments, Error::ColumnDataType { .. } => StatusCode::Internal, - Error::DuplicatedTimestampColumn { .. } | Error::MissingTimestampColumn { .. } => { - StatusCode::InvalidArguments - } + Error::DuplicatedTimestampColumn { .. } + | Error::DuplicatedColumnName { .. } + | Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments, Error::InvalidColumnProto { .. } => StatusCode::InvalidArguments, Error::CreateVector { .. } => StatusCode::InvalidArguments, Error::MissingField { .. } => StatusCode::InvalidArguments, diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 97531e3f8d..fa8fe002fe 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -12,233 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; -use std::sync::Arc; +use std::collections::HashMap; +use api::helper; use api::helper::ColumnDataTypeWrapper; use api::v1::column::Values; -use api::v1::{ - AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr, - InsertRequest as GrpcInsertRequest, SemanticType, -}; +use api::v1::{AddColumns, Column, CreateTableExpr, InsertRequest as GrpcInsertRequest}; use common_base::BitVec; -use common_time::time::Time; -use common_time::timestamp::Timestamp; -use common_time::{Date, DateTime, Interval}; use datatypes::data_type::{ConcreteDataType, DataType}; -use datatypes::prelude::{ValueRef, VectorRef}; -use datatypes::scalars::ScalarVector; +use datatypes::prelude::VectorRef; use datatypes::schema::SchemaRef; -use datatypes::types::{ - Int16Type, Int8Type, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type, -}; -use datatypes::value::Value; -use datatypes::vectors::{ - BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector, - Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector, - IntervalYearMonthVector, PrimitiveVector, StringVector, TimeMicrosecondVector, - TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector, - TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, - UInt64Vector, -}; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use table::metadata::TableId; use table::requests::InsertRequest; use crate::error::{ - ColumnAlreadyExistsSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, - DuplicatedTimestampColumnSnafu, InvalidColumnProtoSnafu, MissingTimestampColumnSnafu, Result, + ColumnAlreadyExistsSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, Result, UnexpectedValuesLengthSnafu, }; -const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32; -const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32; - -#[inline] -fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnDef { - ColumnDef { - name: column_name.to_string(), - datatype, - is_nullable: nullable, - default_constraint: vec![], - } -} +use crate::util; +use crate::util::ColumnExpr; pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result> { - let mut columns_to_add = Vec::default(); - let mut new_columns: HashSet = HashSet::default(); - - for Column { - column_name, - semantic_type, - datatype, - .. - } in columns - { - if schema.column_schema_by_name(column_name).is_none() && !new_columns.contains(column_name) - { - let column_def = Some(build_column_def(column_name, *datatype, true)); - columns_to_add.push(AddColumn { - column_def, - is_key: *semantic_type == TAG_SEMANTIC_TYPE, - location: None, - }); - let _ = new_columns.insert(column_name.to_string()); - } - } - - if columns_to_add.is_empty() { - Ok(None) - } else { - Ok(Some(AddColumns { - add_columns: columns_to_add, - })) - } -} - -pub fn column_to_vector(column: &Column, rows: u32) -> Result { - let wrapper = ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?; - let column_datatype = wrapper.datatype(); - - let rows = rows as usize; - let mut vector = ConcreteDataType::from(wrapper).create_mutable_vector(rows); - - if let Some(values) = &column.values { - let values = collect_column_values(column_datatype, values); - let mut values_iter = values.into_iter(); - - let null_mask = BitVec::from_slice(&column.null_mask); - let mut nulls_iter = null_mask.iter().by_vals().fuse(); - - for i in 0..rows { - if let Some(true) = nulls_iter.next() { - vector.push_null(); - } else { - let value_ref = values_iter - .next() - .with_context(|| InvalidColumnProtoSnafu { - err_msg: format!( - "value not found at position {} of column {}", - i, &column.column_name - ), - })?; - vector - .try_push_value_ref(value_ref) - .context(CreateVectorSnafu)?; - } - } - } else { - (0..rows).for_each(|_| vector.push_null()); - } - Ok(vector.to_vector()) -} - -fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec { - macro_rules! collect_values { - ($value: expr, $mapper: expr) => { - $value.iter().map($mapper).collect::>() - }; - } - - match column_datatype { - ColumnDataType::Boolean => collect_values!(values.bool_values, |v| ValueRef::from(*v)), - ColumnDataType::Int8 => collect_values!(values.i8_values, |v| ValueRef::from(*v as i8)), - ColumnDataType::Int16 => { - collect_values!(values.i16_values, |v| ValueRef::from(*v as i16)) - } - ColumnDataType::Int32 => { - collect_values!(values.i32_values, |v| ValueRef::from(*v)) - } - ColumnDataType::Int64 => { - collect_values!(values.i64_values, |v| ValueRef::from(*v)) - } - ColumnDataType::Uint8 => { - collect_values!(values.u8_values, |v| ValueRef::from(*v as u8)) - } - ColumnDataType::Uint16 => { - collect_values!(values.u16_values, |v| ValueRef::from(*v as u16)) - } - ColumnDataType::Uint32 => { - collect_values!(values.u32_values, |v| ValueRef::from(*v)) - } - ColumnDataType::Uint64 => { - collect_values!(values.u64_values, |v| ValueRef::from(*v)) - } - ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)), - ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)), - ColumnDataType::Binary => { - collect_values!(values.binary_values, |v| ValueRef::from(v.as_slice())) - } - ColumnDataType::String => { - collect_values!(values.string_values, |v| ValueRef::from(v.as_str())) - } - ColumnDataType::Date => { - collect_values!(values.date_values, |v| ValueRef::Date(Date::new(*v))) - } - ColumnDataType::Datetime => { - collect_values!(values.datetime_values, |v| ValueRef::DateTime( - DateTime::new(*v) - )) - } - ColumnDataType::TimestampSecond => { - collect_values!(values.ts_second_values, |v| ValueRef::Timestamp( - Timestamp::new_second(*v) - )) - } - ColumnDataType::TimestampMillisecond => { - collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp( - Timestamp::new_millisecond(*v) - )) - } - ColumnDataType::TimestampMicrosecond => { - collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp( - Timestamp::new_microsecond(*v) - )) - } - ColumnDataType::TimestampNanosecond => { - collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp( - Timestamp::new_nanosecond(*v) - )) - } - ColumnDataType::TimeSecond => { - collect_values!(values.time_second_values, |v| ValueRef::Time( - Time::new_second(*v) - )) - } - ColumnDataType::TimeMillisecond => { - collect_values!(values.time_millisecond_values, |v| ValueRef::Time( - Time::new_millisecond(*v) - )) - } - ColumnDataType::TimeMicrosecond => { - collect_values!(values.time_millisecond_values, |v| ValueRef::Time( - Time::new_microsecond(*v) - )) - } - ColumnDataType::TimeNanosecond => { - collect_values!(values.time_millisecond_values, |v| ValueRef::Time( - Time::new_nanosecond(*v) - )) - } - ColumnDataType::IntervalYearMonth => { - collect_values!(values.interval_year_month_values, |v| { - ValueRef::Interval(Interval::from_i32(*v)) - }) - } - ColumnDataType::IntervalDayTime => { - collect_values!(values.interval_day_time_values, |v| { - ValueRef::Interval(Interval::from_i64(*v)) - }) - } - ColumnDataType::IntervalMonthDayNano => { - collect_values!(values.interval_month_day_nano_values, |v| { - ValueRef::Interval(Interval::from_month_day_nano( - v.months, - v.days, - v.nanoseconds, - )) - }) - } - } + let column_exprs = ColumnExpr::from_columns(columns); + util::extract_new_columns(schema, column_exprs) } /// Try to build create table request from insert data. @@ -250,70 +47,16 @@ pub fn build_create_expr_from_insertion( columns: &[Column], engine: &str, ) -> Result { - let mut new_columns: HashSet = HashSet::default(); - let mut column_defs = Vec::default(); - let mut primary_key_indices = Vec::default(); - let mut timestamp_index = usize::MAX; - - for Column { - column_name, - semantic_type, - datatype, - .. - } in columns - { - if !new_columns.contains(column_name) { - let mut is_nullable = true; - match *semantic_type { - TAG_SEMANTIC_TYPE => primary_key_indices.push(column_defs.len()), - TIMESTAMP_SEMANTIC_TYPE => { - ensure!( - timestamp_index == usize::MAX, - DuplicatedTimestampColumnSnafu { - exists: &columns[timestamp_index].column_name, - duplicated: column_name, - } - ); - timestamp_index = column_defs.len(); - // Timestamp column must not be null. - is_nullable = false; - } - _ => {} - } - - let column_def = build_column_def(column_name, *datatype, is_nullable); - column_defs.push(column_def); - let _ = new_columns.insert(column_name.to_string()); - } - } - - ensure!( - timestamp_index != usize::MAX, - MissingTimestampColumnSnafu { msg: table_name } - ); - let timestamp_field_name = columns[timestamp_index].column_name.clone(); - - let primary_keys = primary_key_indices - .iter() - .map(|idx| columns[*idx].column_name.clone()) - .collect::>(); - - let expr = CreateTableExpr { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - desc: "Created on insertion".to_string(), - column_defs, - time_index: timestamp_field_name, - primary_keys, - create_if_not_exists: true, - table_options: Default::default(), - table_id: table_id.map(|id| api::v1::TableId { id }), - region_numbers: vec![0], // TODO:(hl): region number should be allocated by frontend - engine: engine.to_string(), - }; - - Ok(expr) + let column_exprs = ColumnExpr::from_columns(columns); + util::build_create_table_expr( + catalog_name, + schema_name, + table_id, + table_name, + column_exprs, + engine, + "Created on insertion", + ) } pub fn to_table_insert_request( @@ -364,10 +107,10 @@ pub(crate) fn add_values_to_builder( null_mask: Vec, ) -> Result { if null_mask.is_empty() { - Ok(values_to_vector(&data_type, values)) + Ok(helper::pb_values_to_vector_ref(&data_type, values)) } else { let builder = &mut data_type.create_mutable_vector(row_count); - let values = convert_values(&data_type, values); + let values = helper::pb_values_to_values(&data_type, values); let null_mask = BitVec::from_vec(null_mask); ensure!( null_mask.count_ones() + values.len() == row_count, @@ -392,231 +135,6 @@ pub(crate) fn add_values_to_builder( } } -fn values_to_vector(data_type: &ConcreteDataType, values: Values) -> VectorRef { - match data_type { - ConcreteDataType::Boolean(_) => Arc::new(BooleanVector::from(values.bool_values)), - ConcreteDataType::Int8(_) => Arc::new(PrimitiveVector::::from_iter_values( - values.i8_values.into_iter().map(|x| x as i8), - )), - ConcreteDataType::Int16(_) => Arc::new(PrimitiveVector::::from_iter_values( - values.i16_values.into_iter().map(|x| x as i16), - )), - ConcreteDataType::Int32(_) => Arc::new(Int32Vector::from_vec(values.i32_values)), - ConcreteDataType::Int64(_) => Arc::new(Int64Vector::from_vec(values.i64_values)), - ConcreteDataType::UInt8(_) => Arc::new(PrimitiveVector::::from_iter_values( - values.u8_values.into_iter().map(|x| x as u8), - )), - ConcreteDataType::UInt16(_) => Arc::new(PrimitiveVector::::from_iter_values( - values.u16_values.into_iter().map(|x| x as u16), - )), - ConcreteDataType::UInt32(_) => Arc::new(UInt32Vector::from_vec(values.u32_values)), - ConcreteDataType::UInt64(_) => Arc::new(UInt64Vector::from_vec(values.u64_values)), - ConcreteDataType::Float32(_) => Arc::new(Float32Vector::from_vec(values.f32_values)), - ConcreteDataType::Float64(_) => Arc::new(Float64Vector::from_vec(values.f64_values)), - ConcreteDataType::Binary(_) => Arc::new(BinaryVector::from(values.binary_values)), - ConcreteDataType::String(_) => Arc::new(StringVector::from_vec(values.string_values)), - ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)), - ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)), - ConcreteDataType::Timestamp(unit) => match unit { - TimestampType::Second(_) => { - Arc::new(TimestampSecondVector::from_vec(values.ts_second_values)) - } - TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec( - values.ts_millisecond_values, - )), - TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec( - values.ts_microsecond_values, - )), - TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec( - values.ts_nanosecond_values, - )), - }, - ConcreteDataType::Time(unit) => match unit { - TimeType::Second(_) => Arc::new(TimeSecondVector::from_iter_values( - values.time_second_values.iter().map(|x| *x as i32), - )), - TimeType::Millisecond(_) => Arc::new(TimeMillisecondVector::from_iter_values( - values.time_millisecond_values.iter().map(|x| *x as i32), - )), - TimeType::Microsecond(_) => Arc::new(TimeMicrosecondVector::from_vec( - values.time_microsecond_values, - )), - TimeType::Nanosecond(_) => Arc::new(TimeNanosecondVector::from_vec( - values.time_nanosecond_values, - )), - }, - - ConcreteDataType::Interval(unit) => match unit { - IntervalType::YearMonth(_) => Arc::new(IntervalYearMonthVector::from_vec( - values.interval_year_month_values, - )), - IntervalType::DayTime(_) => Arc::new(IntervalDayTimeVector::from_vec( - values.interval_day_time_values, - )), - IntervalType::MonthDayNano(_) => { - Arc::new(IntervalMonthDayNanoVector::from_iter_values( - values.interval_month_day_nano_values.iter().map(|x| { - Interval::from_month_day_nano(x.months, x.days, x.nanoseconds).to_i128() - }), - )) - } - }, - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { - unreachable!() - } - } -} - -fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { - // TODO(fys): use macros to optimize code - match data_type { - ConcreteDataType::Int64(_) => values - .i64_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::Float64(_) => values - .f64_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::String(_) => values - .string_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::Boolean(_) => values - .bool_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::Int8(_) => values - .i8_values - .into_iter() - // Safety: Since i32 only stores i8 data here, so i32 as i8 is safe. - .map(|val| (val as i8).into()) - .collect(), - ConcreteDataType::Int16(_) => values - .i16_values - .into_iter() - // Safety: Since i32 only stores i16 data here, so i32 as i16 is safe. - .map(|val| (val as i16).into()) - .collect(), - ConcreteDataType::Int32(_) => values - .i32_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::UInt8(_) => values - .u8_values - .into_iter() - // Safety: Since i32 only stores u8 data here, so i32 as u8 is safe. - .map(|val| (val as u8).into()) - .collect(), - ConcreteDataType::UInt16(_) => values - .u16_values - .into_iter() - // Safety: Since i32 only stores u16 data here, so i32 as u16 is safe. - .map(|val| (val as u16).into()) - .collect(), - ConcreteDataType::UInt32(_) => values - .u32_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::UInt64(_) => values - .u64_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::Float32(_) => values - .f32_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::Binary(_) => values - .binary_values - .into_iter() - .map(|val| val.into()) - .collect(), - ConcreteDataType::DateTime(_) => values - .datetime_values - .into_iter() - .map(|v| Value::DateTime(v.into())) - .collect(), - ConcreteDataType::Date(_) => values - .date_values - .into_iter() - .map(|v| Value::Date(v.into())) - .collect(), - ConcreteDataType::Timestamp(TimestampType::Second(_)) => values - .ts_second_values - .into_iter() - .map(|v| Value::Timestamp(Timestamp::new_second(v))) - .collect(), - ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => values - .ts_millisecond_values - .into_iter() - .map(|v| Value::Timestamp(Timestamp::new_millisecond(v))) - .collect(), - ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => values - .ts_microsecond_values - .into_iter() - .map(|v| Value::Timestamp(Timestamp::new_microsecond(v))) - .collect(), - ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => values - .ts_nanosecond_values - .into_iter() - .map(|v| Value::Timestamp(Timestamp::new_nanosecond(v))) - .collect(), - ConcreteDataType::Time(TimeType::Second(_)) => values - .time_second_values - .into_iter() - .map(|v| Value::Time(Time::new_second(v))) - .collect(), - ConcreteDataType::Time(TimeType::Millisecond(_)) => values - .time_millisecond_values - .into_iter() - .map(|v| Value::Time(Time::new_millisecond(v))) - .collect(), - ConcreteDataType::Time(TimeType::Microsecond(_)) => values - .time_microsecond_values - .into_iter() - .map(|v| Value::Time(Time::new_microsecond(v))) - .collect(), - ConcreteDataType::Time(TimeType::Nanosecond(_)) => values - .time_nanosecond_values - .into_iter() - .map(|v| Value::Time(Time::new_nanosecond(v))) - .collect(), - - ConcreteDataType::Interval(IntervalType::YearMonth(_)) => values - .interval_year_month_values - .into_iter() - .map(|v| Value::Interval(Interval::from_i32(v))) - .collect(), - ConcreteDataType::Interval(IntervalType::DayTime(_)) => values - .interval_day_time_values - .into_iter() - .map(|v| Value::Interval(Interval::from_i64(v))) - .collect(), - ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => values - .interval_month_day_nano_values - .into_iter() - .map(|v| { - Value::Interval(Interval::from_month_day_nano( - v.months, - v.days, - v.nanoseconds, - )) - }) - .collect(), - ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => { - unreachable!() - } - } -} - fn is_null(null_mask: &BitVec, idx: usize) -> Option { null_mask.get(idx).as_deref().copied() } @@ -635,12 +153,7 @@ mod tests { use common_time::timestamp::{TimeUnit, Timestamp}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; - use datatypes::types::{ - IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, TimeMillisecondType, - TimeSecondType, TimeType, TimestampMillisecondType, TimestampSecondType, TimestampType, - }; use datatypes::value::Value; - use paste::paste; use snafu::ResultExt; use super::*; @@ -881,280 +394,6 @@ mod tests { assert_eq!(Value::Timestamp(Timestamp::new_millisecond(101)), ts.get(1)); } - macro_rules! test_convert_values { - ($grpc_data_type: ident, $values: expr, $concrete_data_type: ident, $expected_ret: expr) => { - paste! { - #[test] - fn []() { - let values = Values { - [<$grpc_data_type _values>]: $values, - ..Default::default() - }; - - let data_type = ConcreteDataType::[<$concrete_data_type _datatype>](); - let result = convert_values(&data_type, values); - - assert_eq!( - $expected_ret, - result - ); - } - } - }; - } - - test_convert_values!( - i8, - vec![1_i32, 2, 3], - int8, - vec![Value::Int8(1), Value::Int8(2), Value::Int8(3)] - ); - - test_convert_values!( - u8, - vec![1_u32, 2, 3], - uint8, - vec![Value::UInt8(1), Value::UInt8(2), Value::UInt8(3)] - ); - - test_convert_values!( - i16, - vec![1_i32, 2, 3], - int16, - vec![Value::Int16(1), Value::Int16(2), Value::Int16(3)] - ); - - test_convert_values!( - u16, - vec![1_u32, 2, 3], - uint16, - vec![Value::UInt16(1), Value::UInt16(2), Value::UInt16(3)] - ); - - test_convert_values!( - i32, - vec![1, 2, 3], - int32, - vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)] - ); - - test_convert_values!( - u32, - vec![1, 2, 3], - uint32, - vec![Value::UInt32(1), Value::UInt32(2), Value::UInt32(3)] - ); - - test_convert_values!( - i64, - vec![1, 2, 3], - int64, - vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)] - ); - - test_convert_values!( - u64, - vec![1, 2, 3], - uint64, - vec![Value::UInt64(1), Value::UInt64(2), Value::UInt64(3)] - ); - - test_convert_values!( - f32, - vec![1.0, 2.0, 3.0], - float32, - vec![ - Value::Float32(1.0.into()), - Value::Float32(2.0.into()), - Value::Float32(3.0.into()) - ] - ); - - test_convert_values!( - f64, - vec![1.0, 2.0, 3.0], - float64, - vec![ - Value::Float64(1.0.into()), - Value::Float64(2.0.into()), - Value::Float64(3.0.into()) - ] - ); - - test_convert_values!( - string, - vec!["1".to_string(), "2".to_string(), "3".to_string()], - string, - vec![ - Value::String("1".into()), - Value::String("2".into()), - Value::String("3".into()) - ] - ); - - test_convert_values!( - binary, - vec!["1".into(), "2".into(), "3".into()], - binary, - vec![ - Value::Binary(b"1".to_vec().into()), - Value::Binary(b"2".to_vec().into()), - Value::Binary(b"3".to_vec().into()) - ] - ); - - test_convert_values!( - date, - vec![1, 2, 3], - date, - vec![ - Value::Date(1.into()), - Value::Date(2.into()), - Value::Date(3.into()) - ] - ); - - test_convert_values!( - datetime, - vec![1.into(), 2.into(), 3.into()], - datetime, - vec![ - Value::DateTime(1.into()), - Value::DateTime(2.into()), - Value::DateTime(3.into()) - ] - ); - - #[test] - fn test_convert_timestamp_values() { - // second - let actual = convert_values( - &ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)), - Values { - ts_second_values: vec![1_i64, 2_i64, 3_i64], - ..Default::default() - }, - ); - let expect = vec![ - Value::Timestamp(Timestamp::new_second(1_i64)), - Value::Timestamp(Timestamp::new_second(2_i64)), - Value::Timestamp(Timestamp::new_second(3_i64)), - ]; - assert_eq!(expect, actual); - - // millisecond - let actual = convert_values( - &ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)), - Values { - ts_millisecond_values: vec![1_i64, 2_i64, 3_i64], - ..Default::default() - }, - ); - let expect = vec![ - Value::Timestamp(Timestamp::new_millisecond(1_i64)), - Value::Timestamp(Timestamp::new_millisecond(2_i64)), - Value::Timestamp(Timestamp::new_millisecond(3_i64)), - ]; - assert_eq!(expect, actual); - } - - #[test] - fn test_convert_time_values() { - // second - let actual = convert_values( - &ConcreteDataType::Time(TimeType::Second(TimeSecondType)), - Values { - time_second_values: vec![1_i64, 2_i64, 3_i64], - ..Default::default() - }, - ); - let expect = vec![ - Value::Time(Time::new_second(1_i64)), - Value::Time(Time::new_second(2_i64)), - Value::Time(Time::new_second(3_i64)), - ]; - assert_eq!(expect, actual); - - // millisecond - let actual = convert_values( - &ConcreteDataType::Time(TimeType::Millisecond(TimeMillisecondType)), - Values { - time_millisecond_values: vec![1_i64, 2_i64, 3_i64], - ..Default::default() - }, - ); - let expect = vec![ - Value::Time(Time::new_millisecond(1_i64)), - Value::Time(Time::new_millisecond(2_i64)), - Value::Time(Time::new_millisecond(3_i64)), - ]; - assert_eq!(expect, actual); - } - - #[test] - fn test_convert_interval_values() { - // year_month - let actual = convert_values( - &ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType)), - Values { - interval_year_month_values: vec![1_i32, 2_i32, 3_i32], - ..Default::default() - }, - ); - let expect = vec![ - Value::Interval(Interval::from_year_month(1_i32)), - Value::Interval(Interval::from_year_month(2_i32)), - Value::Interval(Interval::from_year_month(3_i32)), - ]; - assert_eq!(expect, actual); - - // day_time - let actual = convert_values( - &ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType)), - Values { - interval_day_time_values: vec![1_i64, 2_i64, 3_i64], - ..Default::default() - }, - ); - let expect = vec![ - Value::Interval(Interval::from_i64(1_i64)), - Value::Interval(Interval::from_i64(2_i64)), - Value::Interval(Interval::from_i64(3_i64)), - ]; - assert_eq!(expect, actual); - - // month_day_nano - let actual = convert_values( - &ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType)), - Values { - interval_month_day_nano_values: vec![ - IntervalMonthDayNano { - months: 1, - days: 2, - nanoseconds: 3, - }, - IntervalMonthDayNano { - months: 5, - days: 6, - nanoseconds: 7, - }, - IntervalMonthDayNano { - months: 9, - days: 10, - nanoseconds: 11, - }, - ], - ..Default::default() - }, - ); - let expect = vec![ - Value::Interval(Interval::from_month_day_nano(1, 2, 3)), - Value::Interval(Interval::from_month_day_nano(5, 6, 7)), - Value::Interval(Interval::from_month_day_nano(9, 10, 11)), - ]; - assert_eq!(expect, actual); - } - #[test] fn test_is_null() { let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]); @@ -1178,7 +417,7 @@ mod tests { }; let host_column = Column { column_name: "host".to_string(), - semantic_type: TAG_SEMANTIC_TYPE, + semantic_type: SemanticType::Tag as i32, values: Some(host_vals), null_mask: vec![0], datatype: ColumnDataType::String as i32, @@ -1248,7 +487,7 @@ mod tests { }; let ts_column = Column { column_name: "ts".to_string(), - semantic_type: TIMESTAMP_SEMANTIC_TYPE, + semantic_type: SemanticType::Timestamp as i32, values: Some(ts_vals), null_mask: vec![0], datatype: ColumnDataType::TimestampMillisecond as i32, diff --git a/src/common/grpc-expr/src/lib.rs b/src/common/grpc-expr/src/lib.rs index e46b2ad400..61da4fe195 100644 --- a/src/common/grpc-expr/src/lib.rs +++ b/src/common/grpc-expr/src/lib.rs @@ -16,6 +16,7 @@ mod alter; pub mod delete; pub mod error; pub mod insert; +mod util; pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema}; -pub use insert::{build_create_expr_from_insertion, column_to_vector, find_new_columns}; +pub use insert::{build_create_expr_from_insertion, find_new_columns}; diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs new file mode 100644 index 0000000000..89e6b2dd27 --- /dev/null +++ b/src/common/grpc-expr/src/util.rs @@ -0,0 +1,184 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use api::v1::{ + AddColumn, AddColumns, Column, ColumnDef, ColumnSchema, CreateTableExpr, SemanticType, +}; +use datatypes::schema::Schema; +use snafu::{ensure, OptionExt}; +use table::metadata::TableId; + +use crate::error::{ + DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, MissingTimestampColumnSnafu, Result, +}; + +pub struct ColumnExpr { + pub column_name: String, + pub datatype: i32, + pub semantic_type: i32, +} + +impl ColumnExpr { + #[inline] + pub fn from_columns(columns: &[Column]) -> Vec { + columns.iter().map(Self::from).collect() + } +} + +impl From<&Column> for ColumnExpr { + fn from(column: &Column) -> Self { + Self { + column_name: column.column_name.clone(), + datatype: column.datatype, + semantic_type: column.semantic_type, + } + } +} + +impl From<&ColumnSchema> for ColumnExpr { + fn from(schema: &ColumnSchema) -> Self { + Self { + column_name: schema.column_name.clone(), + datatype: schema.datatype, + semantic_type: schema.semantic_type, + } + } +} + +pub fn build_create_table_expr( + catalog_name: &str, + schema_name: &str, + table_id: Option, + table_name: &str, + column_exprs: Vec, + engine: &str, + desc: &str, +) -> Result { + // Check for duplicate names. If found, raise an error. + // + // The introduction of hashset incurs additional memory overhead + // but achieves a time complexity of O(1). + // + // The separate iteration over `column_exprs` is because the CPU prefers + // smaller loops, and avoid cloning String. + let mut distinct_names = HashSet::with_capacity(column_exprs.len()); + for ColumnExpr { column_name, .. } in &column_exprs { + ensure!( + distinct_names.insert(column_name), + DuplicatedColumnNameSnafu { name: column_name } + ); + } + + let mut column_defs = Vec::with_capacity(column_exprs.len()); + let mut primary_keys = Vec::default(); + let mut time_index = None; + + for ColumnExpr { + column_name, + datatype, + semantic_type, + } in column_exprs + { + let mut is_nullable = true; + match semantic_type { + v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.clone()), + v if v == SemanticType::Timestamp as i32 => { + ensure!( + time_index.is_none(), + DuplicatedTimestampColumnSnafu { + exists: time_index.unwrap(), + duplicated: &column_name, + } + ); + time_index = Some(column_name.clone()); + // Timestamp column must not be null. + is_nullable = false; + } + _ => {} + } + + let column_def = ColumnDef { + name: column_name, + datatype, + is_nullable, + default_constraint: vec![], + }; + column_defs.push(column_def); + } + + let time_index = time_index.context(MissingTimestampColumnSnafu { + msg: format!("table is {}", table_name), + })?; + + let expr = CreateTableExpr { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + desc: desc.to_string(), + column_defs, + time_index, + primary_keys, + create_if_not_exists: true, + table_options: Default::default(), + table_id: table_id.map(|id| api::v1::TableId { id }), + // TODO(hl): region number should be allocated by frontend + region_numbers: vec![0], + engine: engine.to_string(), + }; + + Ok(expr) +} + +pub fn extract_new_columns( + schema: &Schema, + column_exprs: Vec, +) -> Result> { + let columns_to_add = column_exprs + .into_iter() + .filter(|expr| schema.column_schema_by_name(&expr.column_name).is_none()) + .map(|expr| { + let is_key = expr.semantic_type == SemanticType::Tag as i32; + let column_def = Some(ColumnDef { + name: expr.column_name, + datatype: expr.datatype, + is_nullable: true, + default_constraint: vec![], + }); + AddColumn { + column_def, + is_key, + location: None, + } + }) + .collect::>(); + + if columns_to_add.is_empty() { + Ok(None) + } else { + let mut distinct_names = HashSet::with_capacity(columns_to_add.len()); + for add_column in &columns_to_add { + let name = &add_column.column_def.as_ref().unwrap().name; + ensure!( + distinct_names.insert(name), + DuplicatedColumnNameSnafu { name } + ); + } + + Ok(Some(AddColumns { + add_columns: columns_to_add, + })) + } +} diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index f8dcff5891..f857e13f3f 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -232,7 +232,7 @@ impl GrpcQueryHandler for Instance { self.handle_query(query, ctx).await } Request::Ddl(request) => self.handle_ddl(request, ctx).await, - Request::RowInserts(_) | Request::RowDelete(_) => UnsupportedGrpcRequestSnafu { + Request::RowInserts(_) | Request::RowDeletes(_) => UnsupportedGrpcRequestSnafu { kind: "row insert/delete", } .fail(), diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 67f8abd4af..4f8d39d0b8 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -665,7 +665,7 @@ impl GrpcQueryHandler for DistInstance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await, - Request::RowInserts(_) | Request::RowDelete(_) => NotSupportedSnafu { + Request::RowInserts(_) | Request::RowDeletes(_) => NotSupportedSnafu { feat: "row insert/delete", } .fail(), diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 93254b9312..9a3b1c06b1 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -44,7 +44,7 @@ impl GrpcQueryHandler for Instance { let output = match request { Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, - Request::RowInserts(_) | Request::RowDelete(_) => { + Request::RowInserts(_) | Request::RowDeletes(_) => { return NotSupportedSnafu { feat: "row insert/delete", } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 3502807644..d10fc1b718 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -162,7 +162,7 @@ impl GrpcQueryHandler for DummyInstance { Request::Inserts(_) | Request::Deletes(_) | Request::RowInserts(_) - | Request::RowDelete(_) => unimplemented!(), + | Request::RowDeletes(_) => unimplemented!(), Request::Query(query_request) => { let query = query_request.query.unwrap(); match query {