refactor: grpc insert (#2188)

* feat: interval type for row protocol

* feat: minor refactor grpc insert

* Update src/common/grpc-expr/src/util.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* fix: by comment

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
JeremyHi
2023-08-16 19:25:25 +08:00
committed by GitHub
parent bb062003ef
commit 6cd7319d67
13 changed files with 796 additions and 949 deletions

3
Cargo.lock generated
View File

@@ -212,6 +212,7 @@ dependencies = [
"common-time",
"datatypes",
"greptime-proto",
"paste",
"prost",
"snafu",
"tonic 0.9.2",
@@ -4136,7 +4137,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/NiwakaDev/greptime-proto.git?rev=ec402b6500f908a0acfab6c889225cd4dc2228a4#ec402b6500f908a0acfab6c889225cd4dc2228a4"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fa79839bbb304e0e7097e1cc4bcac9a63b3e496a#fa79839bbb304e0e7097e1cc4bcac9a63b3e496a"
dependencies = [
"prost",
"serde",

View File

@@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/NiwakaDev/greptime-proto.git", rev = "ec402b6500f908a0acfab6c889225cd4dc2228a4" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fa79839bbb304e0e7097e1cc4bcac9a63b3e496a" }
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"

View File

@@ -16,3 +16,6 @@ tonic.workspace = true
[build-dependencies]
tonic-build = "0.9"
[dev-dependencies]
paste = "1.0"

View File

@@ -12,15 +12,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_base::BitVec;
use common_time::interval::IntervalUnit;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Interval, Timestamp};
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::types::{IntervalType, TimeType, TimestampType};
use datatypes::scalars::ScalarVector;
use datatypes::types::{
Int16Type, Int8Type, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type,
};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use datatypes::vectors::VectorRef;
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
IntervalYearMonthVector, PrimitiveVector, StringVector, TimeMicrosecondVector,
TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector,
UInt64Vector, VectorRef,
};
use greptime_proto::v1;
use greptime_proto::v1::ddl_request::Expr;
use greptime_proto::v1::greptime_request::Request;
@@ -300,9 +312,9 @@ pub fn request_type(request: &Request) -> &'static str {
Request::Inserts(_) => "inserts",
Request::Query(query_req) => query_request_type(query_req),
Request::Ddl(ddl_req) => ddl_request_type(ddl_req),
Request::RowInserts(_) => "row_inserts",
Request::RowDelete(_) => "row_delete",
Request::Deletes(_) => "deletes",
Request::RowInserts(_) => "row_inserts",
Request::RowDeletes(_) => "row_deletes",
}
}
@@ -341,60 +353,265 @@ pub fn convert_i128_to_interval(v: i128) -> IntervalMonthDayNano {
}
}
pub fn pb_value_to_value_ref(value: &greptime_proto::v1::Value) -> ValueRef {
pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef {
let Some(value) = &value.value_data else {
return ValueRef::Null;
};
match value {
greptime_proto::v1::value::ValueData::I8Value(v) => ValueRef::Int8(*v as i8),
greptime_proto::v1::value::ValueData::I16Value(v) => ValueRef::Int16(*v as i16),
greptime_proto::v1::value::ValueData::I32Value(v) => ValueRef::Int32(*v),
greptime_proto::v1::value::ValueData::I64Value(v) => ValueRef::Int64(*v),
greptime_proto::v1::value::ValueData::U8Value(v) => ValueRef::UInt8(*v as u8),
greptime_proto::v1::value::ValueData::U16Value(v) => ValueRef::UInt16(*v as u16),
greptime_proto::v1::value::ValueData::U32Value(v) => ValueRef::UInt32(*v),
greptime_proto::v1::value::ValueData::U64Value(v) => ValueRef::UInt64(*v),
greptime_proto::v1::value::ValueData::F32Value(f) => {
ValueRef::Float32(OrderedF32::from(*f))
ValueData::I8Value(v) => ValueRef::Int8(*v as i8),
ValueData::I16Value(v) => ValueRef::Int16(*v as i16),
ValueData::I32Value(v) => ValueRef::Int32(*v),
ValueData::I64Value(v) => ValueRef::Int64(*v),
ValueData::U8Value(v) => ValueRef::UInt8(*v as u8),
ValueData::U16Value(v) => ValueRef::UInt16(*v as u16),
ValueData::U32Value(v) => ValueRef::UInt32(*v),
ValueData::U64Value(v) => ValueRef::UInt64(*v),
ValueData::F32Value(f) => ValueRef::Float32(OrderedF32::from(*f)),
ValueData::F64Value(f) => ValueRef::Float64(OrderedF64::from(*f)),
ValueData::BoolValue(b) => ValueRef::Boolean(*b),
ValueData::BinaryValue(bytes) => ValueRef::Binary(bytes.as_slice()),
ValueData::StringValue(string) => ValueRef::String(string.as_str()),
ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)),
ValueData::DatetimeValue(d) => ValueRef::DateTime(DateTime::new(*d)),
ValueData::TsSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)),
ValueData::TsMillisecondValue(t) => ValueRef::Timestamp(Timestamp::new_millisecond(*t)),
ValueData::TsMicrosecondValue(t) => ValueRef::Timestamp(Timestamp::new_microsecond(*t)),
ValueData::TsNanosecondValue(t) => ValueRef::Timestamp(Timestamp::new_nanosecond(*t)),
ValueData::TimeSecondValue(t) => ValueRef::Time(Time::new_second(*t)),
ValueData::TimeMillisecondValue(t) => ValueRef::Time(Time::new_millisecond(*t)),
ValueData::TimeMicrosecondValue(t) => ValueRef::Time(Time::new_microsecond(*t)),
ValueData::TimeNanosecondValue(t) => ValueRef::Time(Time::new_nanosecond(*t)),
ValueData::IntervalYearMonthValues(v) => ValueRef::Interval(Interval::from_i32(*v)),
ValueData::IntervalDayTimeValues(v) => ValueRef::Interval(Interval::from_i64(*v)),
ValueData::IntervalMonthDayNanoValues(v) => {
let interval = Interval::from_month_day_nano(v.months, v.days, v.nanoseconds);
ValueRef::Interval(interval)
}
greptime_proto::v1::value::ValueData::F64Value(f) => {
ValueRef::Float64(OrderedF64::from(*f))
}
}
pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> VectorRef {
match data_type {
ConcreteDataType::Boolean(_) => Arc::new(BooleanVector::from(values.bool_values)),
ConcreteDataType::Int8(_) => Arc::new(PrimitiveVector::<Int8Type>::from_iter_values(
values.i8_values.into_iter().map(|x| x as i8),
)),
ConcreteDataType::Int16(_) => Arc::new(PrimitiveVector::<Int16Type>::from_iter_values(
values.i16_values.into_iter().map(|x| x as i16),
)),
ConcreteDataType::Int32(_) => Arc::new(Int32Vector::from_vec(values.i32_values)),
ConcreteDataType::Int64(_) => Arc::new(Int64Vector::from_vec(values.i64_values)),
ConcreteDataType::UInt8(_) => Arc::new(PrimitiveVector::<UInt8Type>::from_iter_values(
values.u8_values.into_iter().map(|x| x as u8),
)),
ConcreteDataType::UInt16(_) => Arc::new(PrimitiveVector::<UInt16Type>::from_iter_values(
values.u16_values.into_iter().map(|x| x as u16),
)),
ConcreteDataType::UInt32(_) => Arc::new(UInt32Vector::from_vec(values.u32_values)),
ConcreteDataType::UInt64(_) => Arc::new(UInt64Vector::from_vec(values.u64_values)),
ConcreteDataType::Float32(_) => Arc::new(Float32Vector::from_vec(values.f32_values)),
ConcreteDataType::Float64(_) => Arc::new(Float64Vector::from_vec(values.f64_values)),
ConcreteDataType::Binary(_) => Arc::new(BinaryVector::from(values.binary_values)),
ConcreteDataType::String(_) => Arc::new(StringVector::from_vec(values.string_values)),
ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)),
ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)),
ConcreteDataType::Timestamp(unit) => match unit {
TimestampType::Second(_) => {
Arc::new(TimestampSecondVector::from_vec(values.ts_second_values))
}
TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec(
values.ts_millisecond_values,
)),
TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec(
values.ts_microsecond_values,
)),
TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec(
values.ts_nanosecond_values,
)),
},
ConcreteDataType::Time(unit) => match unit {
TimeType::Second(_) => Arc::new(TimeSecondVector::from_iter_values(
values.time_second_values.iter().map(|x| *x as i32),
)),
TimeType::Millisecond(_) => Arc::new(TimeMillisecondVector::from_iter_values(
values.time_millisecond_values.iter().map(|x| *x as i32),
)),
TimeType::Microsecond(_) => Arc::new(TimeMicrosecondVector::from_vec(
values.time_microsecond_values,
)),
TimeType::Nanosecond(_) => Arc::new(TimeNanosecondVector::from_vec(
values.time_nanosecond_values,
)),
},
ConcreteDataType::Interval(unit) => match unit {
IntervalType::YearMonth(_) => Arc::new(IntervalYearMonthVector::from_vec(
values.interval_year_month_values,
)),
IntervalType::DayTime(_) => Arc::new(IntervalDayTimeVector::from_vec(
values.interval_day_time_values,
)),
IntervalType::MonthDayNano(_) => {
Arc::new(IntervalMonthDayNanoVector::from_iter_values(
values.interval_month_day_nano_values.iter().map(|x| {
Interval::from_month_day_nano(x.months, x.days, x.nanoseconds).to_i128()
}),
))
}
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()
}
greptime_proto::v1::value::ValueData::BoolValue(b) => ValueRef::Boolean(*b),
greptime_proto::v1::value::ValueData::BinaryValue(bytes) => {
ValueRef::Binary(bytes.as_slice())
}
greptime_proto::v1::value::ValueData::StringValue(string) => {
ValueRef::String(string.as_str())
}
greptime_proto::v1::value::ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)),
greptime_proto::v1::value::ValueData::DatetimeValue(d) => {
ValueRef::DateTime(DateTime::new(*d))
}
greptime_proto::v1::value::ValueData::TsSecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_second(*t))
}
greptime_proto::v1::value::ValueData::TsMillisecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_millisecond(*t))
}
greptime_proto::v1::value::ValueData::TsMicrosecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_microsecond(*t))
}
greptime_proto::v1::value::ValueData::TsNanosecondValue(t) => {
ValueRef::Timestamp(Timestamp::new_nanosecond(*t))
}
greptime_proto::v1::value::ValueData::TimeSecondValue(t) => {
ValueRef::Time(Time::new_second(*t))
}
greptime_proto::v1::value::ValueData::TimeMillisecondValue(t) => {
ValueRef::Time(Time::new_millisecond(*t))
}
greptime_proto::v1::value::ValueData::TimeMicrosecondValue(t) => {
ValueRef::Time(Time::new_microsecond(*t))
}
greptime_proto::v1::value::ValueData::TimeNanosecondValue(t) => {
ValueRef::Time(Time::new_nanosecond(*t))
}
}
pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
// TODO(fys): use macros to optimize code
match data_type {
ConcreteDataType::Int64(_) => values
.i64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Float64(_) => values
.f64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::String(_) => values
.string_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Boolean(_) => values
.bool_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Int8(_) => values
.i8_values
.into_iter()
// Safety: Since i32 only stores i8 data here, so i32 as i8 is safe.
.map(|val| (val as i8).into())
.collect(),
ConcreteDataType::Int16(_) => values
.i16_values
.into_iter()
// Safety: Since i32 only stores i16 data here, so i32 as i16 is safe.
.map(|val| (val as i16).into())
.collect(),
ConcreteDataType::Int32(_) => values
.i32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::UInt8(_) => values
.u8_values
.into_iter()
// Safety: Since i32 only stores u8 data here, so i32 as u8 is safe.
.map(|val| (val as u8).into())
.collect(),
ConcreteDataType::UInt16(_) => values
.u16_values
.into_iter()
// Safety: Since i32 only stores u16 data here, so i32 as u16 is safe.
.map(|val| (val as u16).into())
.collect(),
ConcreteDataType::UInt32(_) => values
.u32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::UInt64(_) => values
.u64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Float32(_) => values
.f32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Binary(_) => values
.binary_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::DateTime(_) => values
.datetime_values
.into_iter()
.map(|v| Value::DateTime(v.into()))
.collect(),
ConcreteDataType::Date(_) => values
.date_values
.into_iter()
.map(|v| Value::Date(v.into()))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Second(_)) => values
.ts_second_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_second(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => values
.ts_millisecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_millisecond(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => values
.ts_microsecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_microsecond(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => values
.ts_nanosecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_nanosecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Second(_)) => values
.time_second_values
.into_iter()
.map(|v| Value::Time(Time::new_second(v)))
.collect(),
ConcreteDataType::Time(TimeType::Millisecond(_)) => values
.time_millisecond_values
.into_iter()
.map(|v| Value::Time(Time::new_millisecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Microsecond(_)) => values
.time_microsecond_values
.into_iter()
.map(|v| Value::Time(Time::new_microsecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Nanosecond(_)) => values
.time_nanosecond_values
.into_iter()
.map(|v| Value::Time(Time::new_nanosecond(v)))
.collect(),
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => values
.interval_year_month_values
.into_iter()
.map(|v| Value::Interval(Interval::from_i32(v)))
.collect(),
ConcreteDataType::Interval(IntervalType::DayTime(_)) => values
.interval_day_time_values
.into_iter()
.map(|v| Value::Interval(Interval::from_i64(v)))
.collect(),
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => values
.interval_month_day_nano_values
.into_iter()
.map(|v| {
Value::Interval(Interval::from_month_day_nano(
v.months,
v.days,
v.nanoseconds,
))
})
.collect(),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()
}
}
}
@@ -478,19 +695,32 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
},
Value::Time(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value_data: Some(v1::value::ValueData::TimeSecondValue(v.value())),
value_data: Some(ValueData::TimeSecondValue(v.value())),
},
TimeUnit::Millisecond => v1::Value {
value_data: Some(v1::value::ValueData::TimeMillisecondValue(v.value())),
value_data: Some(ValueData::TimeMillisecondValue(v.value())),
},
TimeUnit::Microsecond => v1::Value {
value_data: Some(v1::value::ValueData::TimeMicrosecondValue(v.value())),
value_data: Some(ValueData::TimeMicrosecondValue(v.value())),
},
TimeUnit::Nanosecond => v1::Value {
value_data: Some(v1::value::ValueData::TimeNanosecondValue(v.value())),
value_data: Some(ValueData::TimeNanosecondValue(v.value())),
},
},
Value::Interval(_) | Value::List(_) => return None,
Value::Interval(v) => match v.unit() {
IntervalUnit::YearMonth => v1::Value {
value_data: Some(ValueData::IntervalYearMonthValues(v.to_i32())),
},
IntervalUnit::DayTime => v1::Value {
value_data: Some(ValueData::IntervalDayTimeValues(v.to_i64())),
},
IntervalUnit::MonthDayNano => v1::Value {
value_data: Some(ValueData::IntervalMonthDayNanoValues(
convert_i128_to_interval(v.to_i128()),
)),
},
},
Value::List(_) => return None,
};
Some(proto_value)
@@ -500,8 +730,7 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
///
/// If value is null, returns `None`.
pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
let value_data = value.value_data.as_ref()?;
let value_type = match value_data {
let value_type = match value.value_data.as_ref()? {
ValueData::I8Value(_) => ColumnDataType::Int8,
ValueData::I16Value(_) => ColumnDataType::Int16,
ValueData::I32Value(_) => ColumnDataType::Int32,
@@ -525,6 +754,9 @@ pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond,
ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond,
ValueData::TimeNanosecondValue(_) => ColumnDataType::TimeNanosecond,
ValueData::IntervalYearMonthValues(_) => ColumnDataType::IntervalYearMonth,
ValueData::IntervalDayTimeValues(_) => ColumnDataType::IntervalDayTime,
ValueData::IntervalMonthDayNanoValues(_) => ColumnDataType::IntervalMonthDayNano,
};
Some(value_type)
}
@@ -583,12 +815,17 @@ fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType
mod tests {
use std::sync::Arc;
use datatypes::types::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, TimeMillisecondType,
TimeSecondType, TimestampMillisecondType, TimestampSecondType,
};
use datatypes::vectors::{
BooleanVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector,
TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector,
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, Vector,
};
use paste::paste;
use super::*;
@@ -1009,4 +1246,278 @@ mod tests {
assert_eq!(interval.days, 0);
assert_eq!(interval.nanoseconds, 3000);
}
#[test]
fn test_convert_timestamp_values() {
// second
let actual = pb_values_to_values(
&ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)),
Values {
ts_second_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Timestamp(Timestamp::new_second(1_i64)),
Value::Timestamp(Timestamp::new_second(2_i64)),
Value::Timestamp(Timestamp::new_second(3_i64)),
];
assert_eq!(expect, actual);
// millisecond
let actual = pb_values_to_values(
&ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)),
Values {
ts_millisecond_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Timestamp(Timestamp::new_millisecond(1_i64)),
Value::Timestamp(Timestamp::new_millisecond(2_i64)),
Value::Timestamp(Timestamp::new_millisecond(3_i64)),
];
assert_eq!(expect, actual);
}
#[test]
fn test_convert_time_values() {
// second
let actual = pb_values_to_values(
&ConcreteDataType::Time(TimeType::Second(TimeSecondType)),
Values {
time_second_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Time(Time::new_second(1_i64)),
Value::Time(Time::new_second(2_i64)),
Value::Time(Time::new_second(3_i64)),
];
assert_eq!(expect, actual);
// millisecond
let actual = pb_values_to_values(
&ConcreteDataType::Time(TimeType::Millisecond(TimeMillisecondType)),
Values {
time_millisecond_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Time(Time::new_millisecond(1_i64)),
Value::Time(Time::new_millisecond(2_i64)),
Value::Time(Time::new_millisecond(3_i64)),
];
assert_eq!(expect, actual);
}
#[test]
fn test_convert_interval_values() {
// year_month
let actual = pb_values_to_values(
&ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType)),
Values {
interval_year_month_values: vec![1_i32, 2_i32, 3_i32],
..Default::default()
},
);
let expect = vec![
Value::Interval(Interval::from_year_month(1_i32)),
Value::Interval(Interval::from_year_month(2_i32)),
Value::Interval(Interval::from_year_month(3_i32)),
];
assert_eq!(expect, actual);
// day_time
let actual = pb_values_to_values(
&ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType)),
Values {
interval_day_time_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Interval(Interval::from_i64(1_i64)),
Value::Interval(Interval::from_i64(2_i64)),
Value::Interval(Interval::from_i64(3_i64)),
];
assert_eq!(expect, actual);
// month_day_nano
let actual = pb_values_to_values(
&ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType)),
Values {
interval_month_day_nano_values: vec![
IntervalMonthDayNano {
months: 1,
days: 2,
nanoseconds: 3,
},
IntervalMonthDayNano {
months: 5,
days: 6,
nanoseconds: 7,
},
IntervalMonthDayNano {
months: 9,
days: 10,
nanoseconds: 11,
},
],
..Default::default()
},
);
let expect = vec![
Value::Interval(Interval::from_month_day_nano(1, 2, 3)),
Value::Interval(Interval::from_month_day_nano(5, 6, 7)),
Value::Interval(Interval::from_month_day_nano(9, 10, 11)),
];
assert_eq!(expect, actual);
}
macro_rules! test_convert_values {
($grpc_data_type: ident, $values: expr, $concrete_data_type: ident, $expected_ret: expr) => {
paste! {
#[test]
fn [<test_convert_ $grpc_data_type _values>]() {
let values = Values {
[<$grpc_data_type _values>]: $values,
..Default::default()
};
let data_type = ConcreteDataType::[<$concrete_data_type _datatype>]();
let result = pb_values_to_values(&data_type, values);
assert_eq!(
$expected_ret,
result
);
}
}
};
}
test_convert_values!(
i8,
vec![1_i32, 2, 3],
int8,
vec![Value::Int8(1), Value::Int8(2), Value::Int8(3)]
);
test_convert_values!(
u8,
vec![1_u32, 2, 3],
uint8,
vec![Value::UInt8(1), Value::UInt8(2), Value::UInt8(3)]
);
test_convert_values!(
i16,
vec![1_i32, 2, 3],
int16,
vec![Value::Int16(1), Value::Int16(2), Value::Int16(3)]
);
test_convert_values!(
u16,
vec![1_u32, 2, 3],
uint16,
vec![Value::UInt16(1), Value::UInt16(2), Value::UInt16(3)]
);
test_convert_values!(
i32,
vec![1, 2, 3],
int32,
vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)]
);
test_convert_values!(
u32,
vec![1, 2, 3],
uint32,
vec![Value::UInt32(1), Value::UInt32(2), Value::UInt32(3)]
);
test_convert_values!(
i64,
vec![1, 2, 3],
int64,
vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)]
);
test_convert_values!(
u64,
vec![1, 2, 3],
uint64,
vec![Value::UInt64(1), Value::UInt64(2), Value::UInt64(3)]
);
test_convert_values!(
f32,
vec![1.0, 2.0, 3.0],
float32,
vec![
Value::Float32(1.0.into()),
Value::Float32(2.0.into()),
Value::Float32(3.0.into())
]
);
test_convert_values!(
f64,
vec![1.0, 2.0, 3.0],
float64,
vec![
Value::Float64(1.0.into()),
Value::Float64(2.0.into()),
Value::Float64(3.0.into())
]
);
test_convert_values!(
string,
vec!["1".to_string(), "2".to_string(), "3".to_string()],
string,
vec![
Value::String("1".into()),
Value::String("2".into()),
Value::String("3".into())
]
);
test_convert_values!(
binary,
vec!["1".into(), "2".into(), "3".into()],
binary,
vec![
Value::Binary(b"1".to_vec().into()),
Value::Binary(b"2".to_vec().into()),
Value::Binary(b"3".to_vec().into())
]
);
test_convert_values!(
date,
vec![1, 2, 3],
date,
vec![
Value::Date(1.into()),
Value::Date(2.into()),
Value::Date(3.into())
]
);
test_convert_values!(
datetime,
vec![1.into(), 2.into(), 3.into()],
datetime,
vec![
Value::DateTime(1.into()),
Value::DateTime(2.into()),
Value::DateTime(3.into())
]
);
}

View File

@@ -342,106 +342,11 @@ pub struct FlightContext {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::auth_header::AuthScheme;
use api::v1::{AuthHeader, Basic, Column};
use common_grpc::select::{null_mask, values};
use common_grpc_expr::column_to_vector;
use datatypes::prelude::{Vector, VectorRef};
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int16Vector, Int32Vector, Int64Vector, Int8Vector, StringVector, UInt16Vector,
UInt32Vector, UInt64Vector, UInt8Vector,
};
use api::v1::{AuthHeader, Basic};
use crate::database::FlightContext;
#[test]
fn test_column_to_vector() {
let mut column = create_test_column(Arc::new(BooleanVector::from(vec![true])));
column.datatype = -100;
let result = column_to_vector(&column, 1);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Column datatype error, source: Unknown proto column datatype: -100"
);
macro_rules! test_with_vector {
($vector: expr) => {
let vector = Arc::new($vector);
let column = create_test_column(vector.clone());
let result = column_to_vector(&column, vector.len() as u32).unwrap();
assert_eq!(result, vector as VectorRef);
};
}
test_with_vector!(BooleanVector::from(vec![Some(true), None, Some(false)]));
test_with_vector!(Int8Vector::from(vec![Some(i8::MIN), None, Some(i8::MAX)]));
test_with_vector!(Int16Vector::from(vec![
Some(i16::MIN),
None,
Some(i16::MAX)
]));
test_with_vector!(Int32Vector::from(vec![
Some(i32::MIN),
None,
Some(i32::MAX)
]));
test_with_vector!(Int64Vector::from(vec![
Some(i64::MIN),
None,
Some(i64::MAX)
]));
test_with_vector!(UInt8Vector::from(vec![Some(u8::MIN), None, Some(u8::MAX)]));
test_with_vector!(UInt16Vector::from(vec![
Some(u16::MIN),
None,
Some(u16::MAX)
]));
test_with_vector!(UInt32Vector::from(vec![
Some(u32::MIN),
None,
Some(u32::MAX)
]));
test_with_vector!(UInt64Vector::from(vec![
Some(u64::MIN),
None,
Some(u64::MAX)
]));
test_with_vector!(Float32Vector::from(vec![
Some(f32::MIN),
None,
Some(f32::MAX)
]));
test_with_vector!(Float64Vector::from(vec![
Some(f64::MIN),
None,
Some(f64::MAX)
]));
test_with_vector!(BinaryVector::from(vec![
Some(b"".to_vec()),
None,
Some(b"hello".to_vec())
]));
test_with_vector!(StringVector::from(vec![Some(""), None, Some("foo"),]));
test_with_vector!(DateVector::from(vec![Some(1), None, Some(3)]));
test_with_vector!(DateTimeVector::from(vec![Some(4), None, Some(6)]));
}
fn create_test_column(vector: VectorRef) -> Column {
let wrapper: ColumnDataTypeWrapper = vector.data_type().try_into().unwrap();
Column {
column_name: "test".to_string(),
semantic_type: 1,
values: Some(values(&[vector.clone()]).unwrap()),
null_mask: null_mask(&[vector.clone()], vector.len()),
datatype: wrapper.datatype() as i32,
}
}
#[test]
fn test_flight_ctx() {
let mut ctx = FlightContext::default();

View File

@@ -47,6 +47,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Duplicated column name in gRPC requests, name: {}", name,))]
DuplicatedColumnName { name: String, location: Location },
#[snafu(display("Missing timestamp column, msg: {}", msg))]
MissingTimestampColumn { msg: String, location: Location },
@@ -101,9 +104,9 @@ impl ErrorExt for Error {
Error::IllegalDeleteRequest { .. } => StatusCode::InvalidArguments,
Error::ColumnDataType { .. } => StatusCode::Internal,
Error::DuplicatedTimestampColumn { .. } | Error::MissingTimestampColumn { .. } => {
StatusCode::InvalidArguments
}
Error::DuplicatedTimestampColumn { .. }
| Error::DuplicatedColumnName { .. }
| Error::MissingTimestampColumn { .. } => StatusCode::InvalidArguments,
Error::InvalidColumnProto { .. } => StatusCode::InvalidArguments,
Error::CreateVector { .. } => StatusCode::InvalidArguments,
Error::MissingField { .. } => StatusCode::InvalidArguments,

View File

@@ -12,233 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::collections::HashMap;
use api::helper;
use api::helper::ColumnDataTypeWrapper;
use api::v1::column::Values;
use api::v1::{
AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr,
InsertRequest as GrpcInsertRequest, SemanticType,
};
use api::v1::{AddColumns, Column, CreateTableExpr, InsertRequest as GrpcInsertRequest};
use common_base::BitVec;
use common_time::time::Time;
use common_time::timestamp::Timestamp;
use common_time::{Date, DateTime, Interval};
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{ValueRef, VectorRef};
use datatypes::scalars::ScalarVector;
use datatypes::prelude::VectorRef;
use datatypes::schema::SchemaRef;
use datatypes::types::{
Int16Type, Int8Type, IntervalType, TimeType, TimestampType, UInt16Type, UInt8Type,
};
use datatypes::value::Value;
use datatypes::vectors::{
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
IntervalYearMonthVector, PrimitiveVector, StringVector, TimeMicrosecondVector,
TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, TimestampMicrosecondVector,
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector,
UInt64Vector,
};
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use table::metadata::TableId;
use table::requests::InsertRequest;
use crate::error::{
ColumnAlreadyExistsSnafu, ColumnDataTypeSnafu, CreateVectorSnafu,
DuplicatedTimestampColumnSnafu, InvalidColumnProtoSnafu, MissingTimestampColumnSnafu, Result,
ColumnAlreadyExistsSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, Result,
UnexpectedValuesLengthSnafu,
};
const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32;
const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32;
#[inline]
fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnDef {
ColumnDef {
name: column_name.to_string(),
datatype,
is_nullable: nullable,
default_constraint: vec![],
}
}
use crate::util;
use crate::util::ColumnExpr;
pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result<Option<AddColumns>> {
let mut columns_to_add = Vec::default();
let mut new_columns: HashSet<String> = HashSet::default();
for Column {
column_name,
semantic_type,
datatype,
..
} in columns
{
if schema.column_schema_by_name(column_name).is_none() && !new_columns.contains(column_name)
{
let column_def = Some(build_column_def(column_name, *datatype, true));
columns_to_add.push(AddColumn {
column_def,
is_key: *semantic_type == TAG_SEMANTIC_TYPE,
location: None,
});
let _ = new_columns.insert(column_name.to_string());
}
}
if columns_to_add.is_empty() {
Ok(None)
} else {
Ok(Some(AddColumns {
add_columns: columns_to_add,
}))
}
}
pub fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
let wrapper = ColumnDataTypeWrapper::try_new(column.datatype).context(ColumnDataTypeSnafu)?;
let column_datatype = wrapper.datatype();
let rows = rows as usize;
let mut vector = ConcreteDataType::from(wrapper).create_mutable_vector(rows);
if let Some(values) = &column.values {
let values = collect_column_values(column_datatype, values);
let mut values_iter = values.into_iter();
let null_mask = BitVec::from_slice(&column.null_mask);
let mut nulls_iter = null_mask.iter().by_vals().fuse();
for i in 0..rows {
if let Some(true) = nulls_iter.next() {
vector.push_null();
} else {
let value_ref = values_iter
.next()
.with_context(|| InvalidColumnProtoSnafu {
err_msg: format!(
"value not found at position {} of column {}",
i, &column.column_name
),
})?;
vector
.try_push_value_ref(value_ref)
.context(CreateVectorSnafu)?;
}
}
} else {
(0..rows).for_each(|_| vector.push_null());
}
Ok(vector.to_vector())
}
fn collect_column_values(column_datatype: ColumnDataType, values: &Values) -> Vec<ValueRef> {
macro_rules! collect_values {
($value: expr, $mapper: expr) => {
$value.iter().map($mapper).collect::<Vec<ValueRef>>()
};
}
match column_datatype {
ColumnDataType::Boolean => collect_values!(values.bool_values, |v| ValueRef::from(*v)),
ColumnDataType::Int8 => collect_values!(values.i8_values, |v| ValueRef::from(*v as i8)),
ColumnDataType::Int16 => {
collect_values!(values.i16_values, |v| ValueRef::from(*v as i16))
}
ColumnDataType::Int32 => {
collect_values!(values.i32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Int64 => {
collect_values!(values.i64_values, |v| ValueRef::from(*v))
}
ColumnDataType::Uint8 => {
collect_values!(values.u8_values, |v| ValueRef::from(*v as u8))
}
ColumnDataType::Uint16 => {
collect_values!(values.u16_values, |v| ValueRef::from(*v as u16))
}
ColumnDataType::Uint32 => {
collect_values!(values.u32_values, |v| ValueRef::from(*v))
}
ColumnDataType::Uint64 => {
collect_values!(values.u64_values, |v| ValueRef::from(*v))
}
ColumnDataType::Float32 => collect_values!(values.f32_values, |v| ValueRef::from(*v)),
ColumnDataType::Float64 => collect_values!(values.f64_values, |v| ValueRef::from(*v)),
ColumnDataType::Binary => {
collect_values!(values.binary_values, |v| ValueRef::from(v.as_slice()))
}
ColumnDataType::String => {
collect_values!(values.string_values, |v| ValueRef::from(v.as_str()))
}
ColumnDataType::Date => {
collect_values!(values.date_values, |v| ValueRef::Date(Date::new(*v)))
}
ColumnDataType::Datetime => {
collect_values!(values.datetime_values, |v| ValueRef::DateTime(
DateTime::new(*v)
))
}
ColumnDataType::TimestampSecond => {
collect_values!(values.ts_second_values, |v| ValueRef::Timestamp(
Timestamp::new_second(*v)
))
}
ColumnDataType::TimestampMillisecond => {
collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp(
Timestamp::new_millisecond(*v)
))
}
ColumnDataType::TimestampMicrosecond => {
collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp(
Timestamp::new_microsecond(*v)
))
}
ColumnDataType::TimestampNanosecond => {
collect_values!(values.ts_millisecond_values, |v| ValueRef::Timestamp(
Timestamp::new_nanosecond(*v)
))
}
ColumnDataType::TimeSecond => {
collect_values!(values.time_second_values, |v| ValueRef::Time(
Time::new_second(*v)
))
}
ColumnDataType::TimeMillisecond => {
collect_values!(values.time_millisecond_values, |v| ValueRef::Time(
Time::new_millisecond(*v)
))
}
ColumnDataType::TimeMicrosecond => {
collect_values!(values.time_millisecond_values, |v| ValueRef::Time(
Time::new_microsecond(*v)
))
}
ColumnDataType::TimeNanosecond => {
collect_values!(values.time_millisecond_values, |v| ValueRef::Time(
Time::new_nanosecond(*v)
))
}
ColumnDataType::IntervalYearMonth => {
collect_values!(values.interval_year_month_values, |v| {
ValueRef::Interval(Interval::from_i32(*v))
})
}
ColumnDataType::IntervalDayTime => {
collect_values!(values.interval_day_time_values, |v| {
ValueRef::Interval(Interval::from_i64(*v))
})
}
ColumnDataType::IntervalMonthDayNano => {
collect_values!(values.interval_month_day_nano_values, |v| {
ValueRef::Interval(Interval::from_month_day_nano(
v.months,
v.days,
v.nanoseconds,
))
})
}
}
let column_exprs = ColumnExpr::from_columns(columns);
util::extract_new_columns(schema, column_exprs)
}
/// Try to build create table request from insert data.
@@ -250,70 +47,16 @@ pub fn build_create_expr_from_insertion(
columns: &[Column],
engine: &str,
) -> Result<CreateTableExpr> {
let mut new_columns: HashSet<String> = HashSet::default();
let mut column_defs = Vec::default();
let mut primary_key_indices = Vec::default();
let mut timestamp_index = usize::MAX;
for Column {
column_name,
semantic_type,
datatype,
..
} in columns
{
if !new_columns.contains(column_name) {
let mut is_nullable = true;
match *semantic_type {
TAG_SEMANTIC_TYPE => primary_key_indices.push(column_defs.len()),
TIMESTAMP_SEMANTIC_TYPE => {
ensure!(
timestamp_index == usize::MAX,
DuplicatedTimestampColumnSnafu {
exists: &columns[timestamp_index].column_name,
duplicated: column_name,
}
);
timestamp_index = column_defs.len();
// Timestamp column must not be null.
is_nullable = false;
}
_ => {}
}
let column_def = build_column_def(column_name, *datatype, is_nullable);
column_defs.push(column_def);
let _ = new_columns.insert(column_name.to_string());
}
}
ensure!(
timestamp_index != usize::MAX,
MissingTimestampColumnSnafu { msg: table_name }
);
let timestamp_field_name = columns[timestamp_index].column_name.clone();
let primary_keys = primary_key_indices
.iter()
.map(|idx| columns[*idx].column_name.clone())
.collect::<Vec<_>>();
let expr = CreateTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
desc: "Created on insertion".to_string(),
column_defs,
time_index: timestamp_field_name,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id: table_id.map(|id| api::v1::TableId { id }),
region_numbers: vec![0], // TODO:(hl): region number should be allocated by frontend
engine: engine.to_string(),
};
Ok(expr)
let column_exprs = ColumnExpr::from_columns(columns);
util::build_create_table_expr(
catalog_name,
schema_name,
table_id,
table_name,
column_exprs,
engine,
"Created on insertion",
)
}
pub fn to_table_insert_request(
@@ -364,10 +107,10 @@ pub(crate) fn add_values_to_builder(
null_mask: Vec<u8>,
) -> Result<VectorRef> {
if null_mask.is_empty() {
Ok(values_to_vector(&data_type, values))
Ok(helper::pb_values_to_vector_ref(&data_type, values))
} else {
let builder = &mut data_type.create_mutable_vector(row_count);
let values = convert_values(&data_type, values);
let values = helper::pb_values_to_values(&data_type, values);
let null_mask = BitVec::from_vec(null_mask);
ensure!(
null_mask.count_ones() + values.len() == row_count,
@@ -392,231 +135,6 @@ pub(crate) fn add_values_to_builder(
}
}
fn values_to_vector(data_type: &ConcreteDataType, values: Values) -> VectorRef {
match data_type {
ConcreteDataType::Boolean(_) => Arc::new(BooleanVector::from(values.bool_values)),
ConcreteDataType::Int8(_) => Arc::new(PrimitiveVector::<Int8Type>::from_iter_values(
values.i8_values.into_iter().map(|x| x as i8),
)),
ConcreteDataType::Int16(_) => Arc::new(PrimitiveVector::<Int16Type>::from_iter_values(
values.i16_values.into_iter().map(|x| x as i16),
)),
ConcreteDataType::Int32(_) => Arc::new(Int32Vector::from_vec(values.i32_values)),
ConcreteDataType::Int64(_) => Arc::new(Int64Vector::from_vec(values.i64_values)),
ConcreteDataType::UInt8(_) => Arc::new(PrimitiveVector::<UInt8Type>::from_iter_values(
values.u8_values.into_iter().map(|x| x as u8),
)),
ConcreteDataType::UInt16(_) => Arc::new(PrimitiveVector::<UInt16Type>::from_iter_values(
values.u16_values.into_iter().map(|x| x as u16),
)),
ConcreteDataType::UInt32(_) => Arc::new(UInt32Vector::from_vec(values.u32_values)),
ConcreteDataType::UInt64(_) => Arc::new(UInt64Vector::from_vec(values.u64_values)),
ConcreteDataType::Float32(_) => Arc::new(Float32Vector::from_vec(values.f32_values)),
ConcreteDataType::Float64(_) => Arc::new(Float64Vector::from_vec(values.f64_values)),
ConcreteDataType::Binary(_) => Arc::new(BinaryVector::from(values.binary_values)),
ConcreteDataType::String(_) => Arc::new(StringVector::from_vec(values.string_values)),
ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)),
ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)),
ConcreteDataType::Timestamp(unit) => match unit {
TimestampType::Second(_) => {
Arc::new(TimestampSecondVector::from_vec(values.ts_second_values))
}
TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec(
values.ts_millisecond_values,
)),
TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec(
values.ts_microsecond_values,
)),
TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec(
values.ts_nanosecond_values,
)),
},
ConcreteDataType::Time(unit) => match unit {
TimeType::Second(_) => Arc::new(TimeSecondVector::from_iter_values(
values.time_second_values.iter().map(|x| *x as i32),
)),
TimeType::Millisecond(_) => Arc::new(TimeMillisecondVector::from_iter_values(
values.time_millisecond_values.iter().map(|x| *x as i32),
)),
TimeType::Microsecond(_) => Arc::new(TimeMicrosecondVector::from_vec(
values.time_microsecond_values,
)),
TimeType::Nanosecond(_) => Arc::new(TimeNanosecondVector::from_vec(
values.time_nanosecond_values,
)),
},
ConcreteDataType::Interval(unit) => match unit {
IntervalType::YearMonth(_) => Arc::new(IntervalYearMonthVector::from_vec(
values.interval_year_month_values,
)),
IntervalType::DayTime(_) => Arc::new(IntervalDayTimeVector::from_vec(
values.interval_day_time_values,
)),
IntervalType::MonthDayNano(_) => {
Arc::new(IntervalMonthDayNanoVector::from_iter_values(
values.interval_month_day_nano_values.iter().map(|x| {
Interval::from_month_day_nano(x.months, x.days, x.nanoseconds).to_i128()
}),
))
}
},
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()
}
}
}
fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
// TODO(fys): use macros to optimize code
match data_type {
ConcreteDataType::Int64(_) => values
.i64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Float64(_) => values
.f64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::String(_) => values
.string_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Boolean(_) => values
.bool_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Int8(_) => values
.i8_values
.into_iter()
// Safety: Since i32 only stores i8 data here, so i32 as i8 is safe.
.map(|val| (val as i8).into())
.collect(),
ConcreteDataType::Int16(_) => values
.i16_values
.into_iter()
// Safety: Since i32 only stores i16 data here, so i32 as i16 is safe.
.map(|val| (val as i16).into())
.collect(),
ConcreteDataType::Int32(_) => values
.i32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::UInt8(_) => values
.u8_values
.into_iter()
// Safety: Since i32 only stores u8 data here, so i32 as u8 is safe.
.map(|val| (val as u8).into())
.collect(),
ConcreteDataType::UInt16(_) => values
.u16_values
.into_iter()
// Safety: Since i32 only stores u16 data here, so i32 as u16 is safe.
.map(|val| (val as u16).into())
.collect(),
ConcreteDataType::UInt32(_) => values
.u32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::UInt64(_) => values
.u64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Float32(_) => values
.f32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Binary(_) => values
.binary_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::DateTime(_) => values
.datetime_values
.into_iter()
.map(|v| Value::DateTime(v.into()))
.collect(),
ConcreteDataType::Date(_) => values
.date_values
.into_iter()
.map(|v| Value::Date(v.into()))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Second(_)) => values
.ts_second_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_second(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => values
.ts_millisecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_millisecond(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => values
.ts_microsecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_microsecond(v)))
.collect(),
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => values
.ts_nanosecond_values
.into_iter()
.map(|v| Value::Timestamp(Timestamp::new_nanosecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Second(_)) => values
.time_second_values
.into_iter()
.map(|v| Value::Time(Time::new_second(v)))
.collect(),
ConcreteDataType::Time(TimeType::Millisecond(_)) => values
.time_millisecond_values
.into_iter()
.map(|v| Value::Time(Time::new_millisecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Microsecond(_)) => values
.time_microsecond_values
.into_iter()
.map(|v| Value::Time(Time::new_microsecond(v)))
.collect(),
ConcreteDataType::Time(TimeType::Nanosecond(_)) => values
.time_nanosecond_values
.into_iter()
.map(|v| Value::Time(Time::new_nanosecond(v)))
.collect(),
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => values
.interval_year_month_values
.into_iter()
.map(|v| Value::Interval(Interval::from_i32(v)))
.collect(),
ConcreteDataType::Interval(IntervalType::DayTime(_)) => values
.interval_day_time_values
.into_iter()
.map(|v| Value::Interval(Interval::from_i64(v)))
.collect(),
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => values
.interval_month_day_nano_values
.into_iter()
.map(|v| {
Value::Interval(Interval::from_month_day_nano(
v.months,
v.days,
v.nanoseconds,
))
})
.collect(),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
unreachable!()
}
}
}
fn is_null(null_mask: &BitVec, idx: usize) -> Option<bool> {
null_mask.get(idx).as_deref().copied()
}
@@ -635,12 +153,7 @@ mod tests {
use common_time::timestamp::{TimeUnit, Timestamp};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use datatypes::types::{
IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, TimeMillisecondType,
TimeSecondType, TimeType, TimestampMillisecondType, TimestampSecondType, TimestampType,
};
use datatypes::value::Value;
use paste::paste;
use snafu::ResultExt;
use super::*;
@@ -881,280 +394,6 @@ mod tests {
assert_eq!(Value::Timestamp(Timestamp::new_millisecond(101)), ts.get(1));
}
macro_rules! test_convert_values {
($grpc_data_type: ident, $values: expr, $concrete_data_type: ident, $expected_ret: expr) => {
paste! {
#[test]
fn [<test_convert_ $grpc_data_type _values>]() {
let values = Values {
[<$grpc_data_type _values>]: $values,
..Default::default()
};
let data_type = ConcreteDataType::[<$concrete_data_type _datatype>]();
let result = convert_values(&data_type, values);
assert_eq!(
$expected_ret,
result
);
}
}
};
}
test_convert_values!(
i8,
vec![1_i32, 2, 3],
int8,
vec![Value::Int8(1), Value::Int8(2), Value::Int8(3)]
);
test_convert_values!(
u8,
vec![1_u32, 2, 3],
uint8,
vec![Value::UInt8(1), Value::UInt8(2), Value::UInt8(3)]
);
test_convert_values!(
i16,
vec![1_i32, 2, 3],
int16,
vec![Value::Int16(1), Value::Int16(2), Value::Int16(3)]
);
test_convert_values!(
u16,
vec![1_u32, 2, 3],
uint16,
vec![Value::UInt16(1), Value::UInt16(2), Value::UInt16(3)]
);
test_convert_values!(
i32,
vec![1, 2, 3],
int32,
vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)]
);
test_convert_values!(
u32,
vec![1, 2, 3],
uint32,
vec![Value::UInt32(1), Value::UInt32(2), Value::UInt32(3)]
);
test_convert_values!(
i64,
vec![1, 2, 3],
int64,
vec![Value::Int64(1), Value::Int64(2), Value::Int64(3)]
);
test_convert_values!(
u64,
vec![1, 2, 3],
uint64,
vec![Value::UInt64(1), Value::UInt64(2), Value::UInt64(3)]
);
test_convert_values!(
f32,
vec![1.0, 2.0, 3.0],
float32,
vec![
Value::Float32(1.0.into()),
Value::Float32(2.0.into()),
Value::Float32(3.0.into())
]
);
test_convert_values!(
f64,
vec![1.0, 2.0, 3.0],
float64,
vec![
Value::Float64(1.0.into()),
Value::Float64(2.0.into()),
Value::Float64(3.0.into())
]
);
test_convert_values!(
string,
vec!["1".to_string(), "2".to_string(), "3".to_string()],
string,
vec![
Value::String("1".into()),
Value::String("2".into()),
Value::String("3".into())
]
);
test_convert_values!(
binary,
vec!["1".into(), "2".into(), "3".into()],
binary,
vec![
Value::Binary(b"1".to_vec().into()),
Value::Binary(b"2".to_vec().into()),
Value::Binary(b"3".to_vec().into())
]
);
test_convert_values!(
date,
vec![1, 2, 3],
date,
vec![
Value::Date(1.into()),
Value::Date(2.into()),
Value::Date(3.into())
]
);
test_convert_values!(
datetime,
vec![1.into(), 2.into(), 3.into()],
datetime,
vec![
Value::DateTime(1.into()),
Value::DateTime(2.into()),
Value::DateTime(3.into())
]
);
#[test]
fn test_convert_timestamp_values() {
// second
let actual = convert_values(
&ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)),
Values {
ts_second_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Timestamp(Timestamp::new_second(1_i64)),
Value::Timestamp(Timestamp::new_second(2_i64)),
Value::Timestamp(Timestamp::new_second(3_i64)),
];
assert_eq!(expect, actual);
// millisecond
let actual = convert_values(
&ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)),
Values {
ts_millisecond_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Timestamp(Timestamp::new_millisecond(1_i64)),
Value::Timestamp(Timestamp::new_millisecond(2_i64)),
Value::Timestamp(Timestamp::new_millisecond(3_i64)),
];
assert_eq!(expect, actual);
}
#[test]
fn test_convert_time_values() {
// second
let actual = convert_values(
&ConcreteDataType::Time(TimeType::Second(TimeSecondType)),
Values {
time_second_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Time(Time::new_second(1_i64)),
Value::Time(Time::new_second(2_i64)),
Value::Time(Time::new_second(3_i64)),
];
assert_eq!(expect, actual);
// millisecond
let actual = convert_values(
&ConcreteDataType::Time(TimeType::Millisecond(TimeMillisecondType)),
Values {
time_millisecond_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Time(Time::new_millisecond(1_i64)),
Value::Time(Time::new_millisecond(2_i64)),
Value::Time(Time::new_millisecond(3_i64)),
];
assert_eq!(expect, actual);
}
#[test]
fn test_convert_interval_values() {
// year_month
let actual = convert_values(
&ConcreteDataType::Interval(IntervalType::YearMonth(IntervalYearMonthType)),
Values {
interval_year_month_values: vec![1_i32, 2_i32, 3_i32],
..Default::default()
},
);
let expect = vec![
Value::Interval(Interval::from_year_month(1_i32)),
Value::Interval(Interval::from_year_month(2_i32)),
Value::Interval(Interval::from_year_month(3_i32)),
];
assert_eq!(expect, actual);
// day_time
let actual = convert_values(
&ConcreteDataType::Interval(IntervalType::DayTime(IntervalDayTimeType)),
Values {
interval_day_time_values: vec![1_i64, 2_i64, 3_i64],
..Default::default()
},
);
let expect = vec![
Value::Interval(Interval::from_i64(1_i64)),
Value::Interval(Interval::from_i64(2_i64)),
Value::Interval(Interval::from_i64(3_i64)),
];
assert_eq!(expect, actual);
// month_day_nano
let actual = convert_values(
&ConcreteDataType::Interval(IntervalType::MonthDayNano(IntervalMonthDayNanoType)),
Values {
interval_month_day_nano_values: vec![
IntervalMonthDayNano {
months: 1,
days: 2,
nanoseconds: 3,
},
IntervalMonthDayNano {
months: 5,
days: 6,
nanoseconds: 7,
},
IntervalMonthDayNano {
months: 9,
days: 10,
nanoseconds: 11,
},
],
..Default::default()
},
);
let expect = vec![
Value::Interval(Interval::from_month_day_nano(1, 2, 3)),
Value::Interval(Interval::from_month_day_nano(5, 6, 7)),
Value::Interval(Interval::from_month_day_nano(9, 10, 11)),
];
assert_eq!(expect, actual);
}
#[test]
fn test_is_null() {
let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]);
@@ -1178,7 +417,7 @@ mod tests {
};
let host_column = Column {
column_name: "host".to_string(),
semantic_type: TAG_SEMANTIC_TYPE,
semantic_type: SemanticType::Tag as i32,
values: Some(host_vals),
null_mask: vec![0],
datatype: ColumnDataType::String as i32,
@@ -1248,7 +487,7 @@ mod tests {
};
let ts_column = Column {
column_name: "ts".to_string(),
semantic_type: TIMESTAMP_SEMANTIC_TYPE,
semantic_type: SemanticType::Timestamp as i32,
values: Some(ts_vals),
null_mask: vec![0],
datatype: ColumnDataType::TimestampMillisecond as i32,

View File

@@ -16,6 +16,7 @@ mod alter;
pub mod delete;
pub mod error;
pub mod insert;
mod util;
pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema};
pub use insert::{build_create_expr_from_insertion, column_to_vector, find_new_columns};
pub use insert::{build_create_expr_from_insertion, find_new_columns};

View File

@@ -0,0 +1,184 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::{
AddColumn, AddColumns, Column, ColumnDef, ColumnSchema, CreateTableExpr, SemanticType,
};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
use table::metadata::TableId;
use crate::error::{
DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, MissingTimestampColumnSnafu, Result,
};
pub struct ColumnExpr {
pub column_name: String,
pub datatype: i32,
pub semantic_type: i32,
}
impl ColumnExpr {
#[inline]
pub fn from_columns(columns: &[Column]) -> Vec<Self> {
columns.iter().map(Self::from).collect()
}
}
impl From<&Column> for ColumnExpr {
fn from(column: &Column) -> Self {
Self {
column_name: column.column_name.clone(),
datatype: column.datatype,
semantic_type: column.semantic_type,
}
}
}
impl From<&ColumnSchema> for ColumnExpr {
fn from(schema: &ColumnSchema) -> Self {
Self {
column_name: schema.column_name.clone(),
datatype: schema.datatype,
semantic_type: schema.semantic_type,
}
}
}
pub fn build_create_table_expr(
catalog_name: &str,
schema_name: &str,
table_id: Option<TableId>,
table_name: &str,
column_exprs: Vec<ColumnExpr>,
engine: &str,
desc: &str,
) -> Result<CreateTableExpr> {
// Check for duplicate names. If found, raise an error.
//
// The introduction of hashset incurs additional memory overhead
// but achieves a time complexity of O(1).
//
// The separate iteration over `column_exprs` is because the CPU prefers
// smaller loops, and avoid cloning String.
let mut distinct_names = HashSet::with_capacity(column_exprs.len());
for ColumnExpr { column_name, .. } in &column_exprs {
ensure!(
distinct_names.insert(column_name),
DuplicatedColumnNameSnafu { name: column_name }
);
}
let mut column_defs = Vec::with_capacity(column_exprs.len());
let mut primary_keys = Vec::default();
let mut time_index = None;
for ColumnExpr {
column_name,
datatype,
semantic_type,
} in column_exprs
{
let mut is_nullable = true;
match semantic_type {
v if v == SemanticType::Tag as i32 => primary_keys.push(column_name.clone()),
v if v == SemanticType::Timestamp as i32 => {
ensure!(
time_index.is_none(),
DuplicatedTimestampColumnSnafu {
exists: time_index.unwrap(),
duplicated: &column_name,
}
);
time_index = Some(column_name.clone());
// Timestamp column must not be null.
is_nullable = false;
}
_ => {}
}
let column_def = ColumnDef {
name: column_name,
datatype,
is_nullable,
default_constraint: vec![],
};
column_defs.push(column_def);
}
let time_index = time_index.context(MissingTimestampColumnSnafu {
msg: format!("table is {}", table_name),
})?;
let expr = CreateTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
desc: desc.to_string(),
column_defs,
time_index,
primary_keys,
create_if_not_exists: true,
table_options: Default::default(),
table_id: table_id.map(|id| api::v1::TableId { id }),
// TODO(hl): region number should be allocated by frontend
region_numbers: vec![0],
engine: engine.to_string(),
};
Ok(expr)
}
pub fn extract_new_columns(
schema: &Schema,
column_exprs: Vec<ColumnExpr>,
) -> Result<Option<AddColumns>> {
let columns_to_add = column_exprs
.into_iter()
.filter(|expr| schema.column_schema_by_name(&expr.column_name).is_none())
.map(|expr| {
let is_key = expr.semantic_type == SemanticType::Tag as i32;
let column_def = Some(ColumnDef {
name: expr.column_name,
datatype: expr.datatype,
is_nullable: true,
default_constraint: vec![],
});
AddColumn {
column_def,
is_key,
location: None,
}
})
.collect::<Vec<_>>();
if columns_to_add.is_empty() {
Ok(None)
} else {
let mut distinct_names = HashSet::with_capacity(columns_to_add.len());
for add_column in &columns_to_add {
let name = &add_column.column_def.as_ref().unwrap().name;
ensure!(
distinct_names.insert(name),
DuplicatedColumnNameSnafu { name }
);
}
Ok(Some(AddColumns {
add_columns: columns_to_add,
}))
}
}

View File

@@ -232,7 +232,7 @@ impl GrpcQueryHandler for Instance {
self.handle_query(query, ctx).await
}
Request::Ddl(request) => self.handle_ddl(request, ctx).await,
Request::RowInserts(_) | Request::RowDelete(_) => UnsupportedGrpcRequestSnafu {
Request::RowInserts(_) | Request::RowDeletes(_) => UnsupportedGrpcRequestSnafu {
kind: "row insert/delete",
}
.fail(),

View File

@@ -665,7 +665,7 @@ impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
match request {
Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await,
Request::RowInserts(_) | Request::RowDelete(_) => NotSupportedSnafu {
Request::RowInserts(_) | Request::RowDeletes(_) => NotSupportedSnafu {
feat: "row insert/delete",
}
.fail(),

View File

@@ -44,7 +44,7 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(_) | Request::RowDelete(_) => {
Request::RowInserts(_) | Request::RowDeletes(_) => {
return NotSupportedSnafu {
feat: "row insert/delete",
}

View File

@@ -162,7 +162,7 @@ impl GrpcQueryHandler for DummyInstance {
Request::Inserts(_)
| Request::Deletes(_)
| Request::RowInserts(_)
| Request::RowDelete(_) => unimplemented!(),
| Request::RowDeletes(_) => unimplemented!(),
Request::Query(query_request) => {
let query = query_request.query.unwrap();
match query {