feat: struct value and vector (#7033)

* feat: struct value

Signed-off-by: Ning Sun <sunning@greptime.com>

* feat: update for proto module

* feat: wip struct type

* feat: implement more vector operations

* feat: make datatype and api

* feat: reoslve some compilation issues

* feat: resolve all compilation issues

* chore: format update

* test: resolve tests

* test: test and refactor value-to-pb

* feat: add more tests and fix for value types

* chore: remove dbg

* feat: test and fix iterator

* fix: resolve struct_type issue

* refactor: use vec for struct items

* chore: update proto to main branch

* refactor: address some of review issues

* refactor: update for further review

* Add validation on new methods

* feat: update struct/list json serialization

* refactor: reimplement get in struct_vector

* refactor: struct vector functions

* refactor: fix lint issue

* refactor: address review comments

---------

Signed-off-by: Ning Sun <sunning@greptime.com>
This commit is contained in:
Ning Sun
2025-10-11 05:49:51 +08:00
committed by GitHub
parent 3738440753
commit 749a5ab165
70 changed files with 1581 additions and 503 deletions

2
Cargo.lock generated
View File

@@ -5325,7 +5325,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3e821d0d405e6733690a4e4352812ba2ff780a3e#3e821d0d405e6733690a4e4352812ba2ff780a3e"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=d75496d5d09dedcd0edcade57ccf0a522f4393ae#d75496d5d09dedcd0edcade57ccf0a522f4393ae"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",

View File

@@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3e821d0d405e6733690a4e4352812ba2ff780a3e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "d75496d5d09dedcd0edcade57ccf0a522f4393ae" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -15,7 +15,6 @@
use std::collections::HashSet;
use std::sync::Arc;
use common_base::BitVec;
use common_decimal::Decimal128;
use common_decimal::decimal128::{DECIMAL128_DEFAULT_SCALE, DECIMAL128_MAX_PRECISION};
use common_time::time::Time;
@@ -24,9 +23,12 @@ use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::scalars::ScalarVector;
use datatypes::types::{
Int8Type, Int16Type, IntervalType, TimeType, TimestampType, UInt8Type, UInt16Type,
Int8Type, Int16Type, IntervalType, StructField, StructType, TimeType, TimestampType, UInt8Type,
UInt16Type,
};
use datatypes::value::{
ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value,
};
use datatypes::value::{OrderedF32, OrderedF64, Value};
use datatypes::vectors::{
BinaryVector, BooleanVector, DateVector, Decimal128Vector, Float32Vector, Float64Vector,
Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
@@ -42,14 +44,14 @@ use greptime_proto::v1::query_request::Query;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{
self, ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension,
QueryRequest, Row, SemanticType, VectorTypeExtension,
ListTypeExtension, QueryRequest, Row, SemanticType, StructTypeExtension, VectorTypeExtension,
};
use paste::paste;
use snafu::prelude::*;
use crate::error::{self, InconsistentTimeUnitSnafu, InvalidTimeUnitSnafu, Result};
use crate::v1::column::Values;
use crate::v1::{Column, ColumnDataType, Value as GrpcValue};
use crate::v1::{ColumnDataType, Value as GrpcValue};
/// ColumnDataTypeWrapper is a wrapper of ColumnDataType and ColumnDataTypeExtension.
/// It could be used to convert with ConcreteDataType.
@@ -85,7 +87,7 @@ impl ColumnDataTypeWrapper {
/// Get a tuple of ColumnDataType and ColumnDataTypeExtension.
pub fn to_parts(&self) -> (ColumnDataType, Option<ColumnDataTypeExtension>) {
(self.datatype, self.datatype_ext)
(self.datatype, self.datatype_ext.clone())
}
}
@@ -159,6 +161,45 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
ConcreteDataType::vector_default_datatype()
}
}
ColumnDataType::List => {
if let Some(TypeExt::ListType(d)) = datatype_wrapper
.datatype_ext
.as_ref()
.and_then(|datatype_ext| datatype_ext.type_ext.as_ref())
{
let item_type = ColumnDataTypeWrapper {
datatype: d.datatype(),
datatype_ext: d.datatype_extension.clone().map(|d| *d),
};
ConcreteDataType::list_datatype(item_type.into())
} else {
// invalid state: type extension not found
ConcreteDataType::null_datatype()
}
}
ColumnDataType::Struct => {
if let Some(TypeExt::StructType(d)) = datatype_wrapper
.datatype_ext
.as_ref()
.and_then(|datatype_ext| datatype_ext.type_ext.as_ref())
{
let fields = d
.fields
.iter()
.map(|f| {
let field_type = ColumnDataTypeWrapper {
datatype: f.datatype(),
datatype_ext: f.datatype_extension.clone(),
};
StructField::new(f.name.to_string(), field_type.into(), true)
})
.collect::<Vec<_>>();
ConcreteDataType::struct_datatype(StructType::from(fields))
} else {
// invalid state: type extension not found
ConcreteDataType::null_datatype()
}
}
}
}
}
@@ -249,6 +290,39 @@ impl ColumnDataTypeWrapper {
}),
}
}
/// Create a list datatype with the given item type.
pub fn list_datatype(item_type: ColumnDataTypeWrapper) -> Self {
ColumnDataTypeWrapper {
datatype: ColumnDataType::List,
datatype_ext: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::ListType(Box::new(ListTypeExtension {
datatype: item_type.datatype() as i32,
datatype_extension: item_type.datatype_ext.map(Box::new),
}))),
}),
}
}
/// Create a struct datatype with the given field tuples (name, datatype).
pub fn struct_datatype(fields: Vec<(String, ColumnDataTypeWrapper)>) -> Self {
let struct_fields = fields
.into_iter()
.map(|(name, datatype)| greptime_proto::v1::StructField {
name,
datatype: datatype.datatype() as i32,
datatype_extension: datatype.datatype_ext,
})
.collect();
ColumnDataTypeWrapper {
datatype: ColumnDataType::Struct,
datatype_ext: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::StructType(StructTypeExtension {
fields: struct_fields,
})),
}),
}
}
}
impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
@@ -290,9 +364,9 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::Decimal128(_) => ColumnDataType::Decimal128,
ConcreteDataType::Json(_) => ColumnDataType::Json,
ConcreteDataType::Vector(_) => ColumnDataType::Vector,
ConcreteDataType::List(_) => ColumnDataType::List,
ConcreteDataType::Struct(_) => ColumnDataType::Struct,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Duration(_) => {
return error::IntoColumnDataTypeSnafu { from: datatype }.fail();
@@ -321,6 +395,40 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
})),
})
}
ColumnDataType::List => {
if let Some(list_type) = datatype.as_list() {
let list_item_type =
ColumnDataTypeWrapper::try_from(list_type.item_type().clone())?;
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::ListType(Box::new(ListTypeExtension {
datatype: list_item_type.datatype.into(),
datatype_extension: list_item_type.datatype_ext.map(Box::new),
}))),
})
} else {
None
}
}
ColumnDataType::Struct => {
if let Some(struct_type) = datatype.as_struct() {
let mut fields = Vec::with_capacity(struct_type.fields().len());
for field in struct_type.fields() {
let field_type =
ColumnDataTypeWrapper::try_from(field.data_type().clone())?;
let proto_field = crate::v1::StructField {
name: field.name().to_string(),
datatype: field_type.datatype.into(),
datatype_extension: field_type.datatype_ext,
};
fields.push(proto_field);
}
Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::StructType(StructTypeExtension { fields })),
})
} else {
None
}
}
_ => None,
};
Ok(Self {
@@ -448,56 +556,17 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values
binary_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::List => Values {
list_values: Vec::with_capacity(capacity),
..Default::default()
},
ColumnDataType::Struct => Values {
struct_values: Vec::with_capacity(capacity),
..Default::default()
},
}
}
// The type of vals must be same.
pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
let values = column.values.get_or_insert_with(Values::default);
let mut null_mask = BitVec::from_slice(&column.null_mask);
let len = vector.len();
null_mask.reserve_exact(origin_count + len);
null_mask.extend(BitVec::repeat(false, len));
(0..len).for_each(|idx| match vector.get(idx) {
Value::Null => null_mask.set(idx + origin_count, true),
Value::Boolean(val) => values.bool_values.push(val),
Value::UInt8(val) => values.u8_values.push(val.into()),
Value::UInt16(val) => values.u16_values.push(val.into()),
Value::UInt32(val) => values.u32_values.push(val),
Value::UInt64(val) => values.u64_values.push(val),
Value::Int8(val) => values.i8_values.push(val.into()),
Value::Int16(val) => values.i16_values.push(val.into()),
Value::Int32(val) => values.i32_values.push(val),
Value::Int64(val) => values.i64_values.push(val),
Value::Float32(val) => values.f32_values.push(*val),
Value::Float64(val) => values.f64_values.push(*val),
Value::String(val) => values.string_values.push(val.as_utf8().to_string()),
Value::Binary(val) => values.binary_values.push(val.to_vec()),
Value::Date(val) => values.date_values.push(val.val()),
Value::Timestamp(val) => match val.unit() {
TimeUnit::Second => values.timestamp_second_values.push(val.value()),
TimeUnit::Millisecond => values.timestamp_millisecond_values.push(val.value()),
TimeUnit::Microsecond => values.timestamp_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.timestamp_nanosecond_values.push(val.value()),
},
Value::Time(val) => match val.unit() {
TimeUnit::Second => values.time_second_values.push(val.value()),
TimeUnit::Millisecond => values.time_millisecond_values.push(val.value()),
TimeUnit::Microsecond => values.time_microsecond_values.push(val.value()),
TimeUnit::Nanosecond => values.time_nanosecond_values.push(val.value()),
},
Value::IntervalYearMonth(val) => values.interval_year_month_values.push(val.to_i32()),
Value::IntervalDayTime(val) => values.interval_day_time_values.push(val.to_i64()),
Value::IntervalMonthDayNano(val) => values
.interval_month_day_nano_values
.push(convert_month_day_nano_to_pb(val)),
Value::Decimal128(val) => values.decimal128_values.push(convert_to_pb_decimal128(val)),
Value::List(_) | Value::Duration(_) => unreachable!(),
});
column.null_mask = null_mask.into_vec();
}
/// Returns the type name of the [Request].
pub fn request_type(request: &Request) -> &'static str {
match request {
@@ -555,7 +624,7 @@ pub fn convert_to_pb_decimal128(v: Decimal128) -> v1::Decimal128 {
pub fn pb_value_to_value_ref<'a>(
value: &'a v1::Value,
datatype_ext: &'a Option<ColumnDataTypeExtension>,
datatype_ext: Option<&'a ColumnDataTypeExtension>,
) -> ValueRef<'a> {
let Some(value) = &value.value_data else {
return ValueRef::Null;
@@ -622,6 +691,77 @@ pub fn pb_value_to_value_ref<'a>(
))
}
}
ValueData::ListValue(list) => {
let list_datatype_ext = datatype_ext
.as_ref()
.and_then(|ext| {
if let Some(TypeExt::ListType(l)) = &ext.type_ext {
Some(l)
} else {
None
}
})
.expect("list must contain datatype ext");
let item_type = ConcreteDataType::from(ColumnDataTypeWrapper::new(
list_datatype_ext.datatype(),
list_datatype_ext
.datatype_extension
.as_ref()
.map(|ext| *ext.clone()),
));
let items = list
.items
.iter()
.map(|item| {
pb_value_to_value_ref(item, list_datatype_ext.datatype_extension.as_deref())
})
.collect::<Vec<_>>();
let list_value = ListValueRef::RefList {
val: items,
item_datatype: item_type.clone(),
};
ValueRef::List(list_value)
}
ValueData::StructValue(struct_value) => {
let struct_datatype_ext = datatype_ext
.as_ref()
.and_then(|ext| {
if let Some(TypeExt::StructType(s)) = &ext.type_ext {
Some(s)
} else {
None
}
})
.expect("struct must contain datatype ext");
let struct_fields = struct_datatype_ext
.fields
.iter()
.map(|field| {
let field_type = ConcreteDataType::from(ColumnDataTypeWrapper::new(
field.datatype(),
field.datatype_extension.clone(),
));
let field_name = field.name.to_string();
StructField::new(field_name, field_type, true)
})
.collect::<Vec<_>>();
let items = struct_value
.items
.iter()
.zip(struct_datatype_ext.fields.iter())
.map(|(item, field)| pb_value_to_value_ref(item, field.datatype_extension.as_ref()))
.collect::<Vec<ValueRef>>();
let struct_value_ref = StructValueRef::RefList {
val: items,
fields: StructType::new(struct_fields),
};
ValueRef::Struct(struct_value_ref)
}
}
}
@@ -896,8 +1036,8 @@ pub fn is_column_type_value_eq(
}
/// Convert value into proto's value.
pub fn to_proto_value(value: Value) -> Option<v1::Value> {
let proto_value = match value {
pub fn to_proto_value(value: Value) -> v1::Value {
match value {
Value::Null => v1::Value { value_data: None },
Value::Boolean(v) => v1::Value {
value_data: Some(ValueData::BoolValue(v)),
@@ -983,10 +1123,34 @@ pub fn to_proto_value(value: Value) -> Option<v1::Value> {
Value::Decimal128(v) => v1::Value {
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
},
Value::List(_) | Value::Duration(_) => return None,
};
Value::List(list_value) => v1::Value {
value_data: Some(ValueData::ListValue(v1::ListValue {
items: convert_list_to_pb_values(list_value),
})),
},
Value::Struct(struct_value) => v1::Value {
value_data: Some(ValueData::StructValue(v1::StructValue {
items: convert_struct_to_pb_values(struct_value),
})),
},
Value::Duration(_) => v1::Value { value_data: None },
}
}
Some(proto_value)
fn convert_list_to_pb_values(list_value: ListValue) -> Vec<v1::Value> {
list_value
.take_items()
.into_iter()
.map(to_proto_value)
.collect()
}
fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec<v1::Value> {
struct_value
.take_items()
.into_iter()
.map(to_proto_value)
.collect()
}
/// Returns the [ColumnDataTypeWrapper] of the value.
@@ -1021,6 +1185,8 @@ pub fn proto_value_type(value: &v1::Value) -> Option<ColumnDataType> {
ValueData::IntervalDayTimeValue(_) => ColumnDataType::IntervalDayTime,
ValueData::IntervalMonthDayNanoValue(_) => ColumnDataType::IntervalMonthDayNano,
ValueData::Decimal128Value(_) => ColumnDataType::Decimal128,
ValueData::ListValue(_) => ColumnDataType::List,
ValueData::StructValue(_) => ColumnDataType::Struct,
};
Some(value_type)
}
@@ -1075,7 +1241,23 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
convert_month_day_nano_to_pb(v),
)),
Value::Decimal128(v) => Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
Value::List(_) | Value::Duration(_) => unreachable!(),
Value::List(list_value) => {
let items = list_value
.take_items()
.into_iter()
.map(value_to_grpc_value)
.collect();
Some(ValueData::ListValue(v1::ListValue { items }))
}
Value::Struct(struct_value) => {
let items = struct_value
.take_items()
.into_iter()
.map(value_to_grpc_value)
.collect();
Some(ValueData::StructValue(v1::StructValue { items }))
}
Value::Duration(_) => unreachable!(),
},
}
}
@@ -1173,15 +1355,11 @@ mod tests {
TimeMillisecondType, TimeSecondType, TimestampMillisecondType, TimestampSecondType,
UInt32Type,
};
use datatypes::vectors::{
BooleanVector, IntervalDayTimeVector, IntervalMonthDayNanoVector, IntervalYearMonthVector,
TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector,
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector, Vector,
};
use datatypes::vectors::BooleanVector;
use paste::paste;
use super::*;
use crate::v1::Column;
#[test]
fn test_values_with_capacity() {
@@ -1260,6 +1438,14 @@ mod tests {
let values = values_with_capacity(ColumnDataType::Vector, 2);
let values = values.binary_values;
assert_eq!(2, values.capacity());
let values = values_with_capacity(ColumnDataType::List, 2);
let values = values.list_values;
assert_eq!(2, values.capacity());
let values = values_with_capacity(ColumnDataType::Struct, 2);
let values = values.struct_values;
assert_eq!(2, values.capacity());
}
#[test]
@@ -1352,6 +1538,37 @@ mod tests {
ConcreteDataType::vector_datatype(3),
ColumnDataTypeWrapper::vector_datatype(3).into()
);
assert_eq!(
ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::string_datatype()).into()
);
let struct_type = StructType::new(vec![
StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true),
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
StructField::new(
"address".to_string(),
ConcreteDataType::string_datatype(),
true,
),
]);
assert_eq!(
ConcreteDataType::struct_datatype(struct_type.clone()),
ColumnDataTypeWrapper::struct_datatype(vec![
("id".to_string(), ColumnDataTypeWrapper::int64_datatype()),
("name".to_string(), ColumnDataTypeWrapper::string_datatype()),
("age".to_string(), ColumnDataTypeWrapper::int32_datatype()),
(
"address".to_string(),
ColumnDataTypeWrapper::string_datatype()
)
])
.into()
);
}
#[test]
@@ -1455,176 +1672,29 @@ mod tests {
"Failed to create column datatype from Null(NullType)"
);
let result: Result<ColumnDataTypeWrapper> =
ConcreteDataType::list_datatype(ConcreteDataType::boolean_datatype()).try_into();
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Failed to create column datatype from List(ListType { item_type: Boolean(BooleanType) })"
);
}
#[test]
fn test_column_put_timestamp_values() {
let mut column = Column {
column_name: "test".to_string(),
semantic_type: 0,
values: Some(Values {
..Default::default()
}),
null_mask: vec![],
datatype: 0,
..Default::default()
};
let vector = Arc::new(TimestampNanosecondVector::from_vec(vec![1, 2, 3]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![1, 2, 3],
column.values.as_ref().unwrap().timestamp_nanosecond_values
ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::int16_datatype()),
ConcreteDataType::list_datatype(ConcreteDataType::int16_datatype())
.try_into()
.expect("Failed to create column datatype from List(ListType { item_type: Int16(Int16Type) })")
);
let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![4, 5, 6],
column.values.as_ref().unwrap().timestamp_millisecond_values
);
let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![7, 8, 9],
column.values.as_ref().unwrap().timestamp_microsecond_values
);
let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![10, 11, 12],
column.values.as_ref().unwrap().timestamp_second_values
);
}
#[test]
fn test_column_put_time_values() {
let mut column = Column {
column_name: "test".to_string(),
semantic_type: 0,
values: Some(Values {
..Default::default()
}),
null_mask: vec![],
datatype: 0,
..Default::default()
};
let vector = Arc::new(TimeNanosecondVector::from_vec(vec![1, 2, 3]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![1, 2, 3],
column.values.as_ref().unwrap().time_nanosecond_values
);
let vector = Arc::new(TimeMillisecondVector::from_vec(vec![4, 5, 6]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![4, 5, 6],
column.values.as_ref().unwrap().time_millisecond_values
);
let vector = Arc::new(TimeMicrosecondVector::from_vec(vec![7, 8, 9]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![7, 8, 9],
column.values.as_ref().unwrap().time_microsecond_values
);
let vector = Arc::new(TimeSecondVector::from_vec(vec![10, 11, 12]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![10, 11, 12],
column.values.as_ref().unwrap().time_second_values
);
}
#[test]
fn test_column_put_interval_values() {
let mut column = Column {
column_name: "test".to_string(),
semantic_type: 0,
values: Some(Values {
..Default::default()
}),
null_mask: vec![],
datatype: 0,
..Default::default()
};
let vector = Arc::new(IntervalYearMonthVector::from_vec(vec![1, 2, 3]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![1, 2, 3],
column.values.as_ref().unwrap().interval_year_month_values
);
let vector = Arc::new(IntervalDayTimeVector::from_vec(vec![
IntervalDayTime::new(0, 4).into(),
IntervalDayTime::new(0, 5).into(),
IntervalDayTime::new(0, 6).into(),
]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![4, 5, 6],
column.values.as_ref().unwrap().interval_day_time_values
);
let vector = Arc::new(IntervalMonthDayNanoVector::from_vec(vec![
IntervalMonthDayNano::new(0, 0, 7).into(),
IntervalMonthDayNano::new(0, 0, 8).into(),
IntervalMonthDayNano::new(0, 0, 9).into(),
]));
let len = vector.len();
push_vals(&mut column, 3, vector);
(0..len).for_each(|i| {
assert_eq!(
7 + i as i64,
column
.values
.as_ref()
.unwrap()
.interval_month_day_nano_values
.get(i)
.unwrap()
.nanoseconds
);
});
}
#[test]
fn test_column_put_vector() {
use crate::v1::SemanticType;
// Some(false), None, Some(true), Some(true)
let mut column = Column {
column_name: "test".to_string(),
semantic_type: SemanticType::Field as i32,
values: Some(Values {
bool_values: vec![false, true, true],
..Default::default()
}),
null_mask: vec![2],
datatype: ColumnDataType::Boolean as i32,
..Default::default()
};
let row_count = 4;
let vector = Arc::new(BooleanVector::from(vec![Some(true), None, Some(false)]));
push_vals(&mut column, row_count, vector);
// Some(false), None, Some(true), Some(true), Some(true), None, Some(false)
let bool_values = column.values.unwrap().bool_values;
assert_eq!(vec![false, true, true, true, false], bool_values);
let null_mask = column.null_mask;
assert_eq!(34, null_mask[0]);
ColumnDataTypeWrapper::struct_datatype(vec![
("a".to_string(), ColumnDataTypeWrapper::int64_datatype()),
(
"a.a".to_string(),
ColumnDataTypeWrapper::list_datatype(ColumnDataTypeWrapper::string_datatype())
)
]),
ConcreteDataType::struct_datatype(StructType::new(vec![
StructField::new("a".to_string(), ConcreteDataType::int64_datatype(), true),
StructField::new(
"a.a".to_string(),
ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()), true
)
])).try_into().expect("Failed to create column datatype from Struct(StructType { fields: [StructField { name: \"a\", data_type: Int64(Int64Type) }, StructField { name: \"a.a\", data_type: List(ListType { item_type: String(StringType) }) }] })")
)
}
#[test]
@@ -2000,4 +2070,50 @@ mod tests {
assert_eq!(pb_decimal.lo, 123);
assert_eq!(pb_decimal.hi, 0);
}
#[test]
fn test_list_to_pb_value() {
let value = Value::List(ListValue::new(
vec![Value::Boolean(true)],
ConcreteDataType::boolean_datatype(),
));
let pb_value = to_proto_value(value);
match pb_value.value_data.unwrap() {
ValueData::ListValue(pb_list_value) => {
assert_eq!(pb_list_value.items.len(), 1);
}
_ => panic!("Unexpected value type"),
}
}
#[test]
fn test_struct_to_pb_value() {
let items = vec![Value::Boolean(true), Value::String("tom".into())];
let value = Value::Struct(
StructValue::try_new(
items,
StructType::new(vec![
StructField::new(
"a.a".to_string(),
ConcreteDataType::boolean_datatype(),
true,
),
StructField::new("a.b".to_string(), ConcreteDataType::string_datatype(), true),
]),
)
.unwrap(),
);
let pb_value = to_proto_value(value);
match pb_value.value_data.unwrap() {
ValueData::StructValue(pb_struct_value) => {
assert_eq!(pb_struct_value.items.len(), 2);
}
_ => panic!("Unexpected value type"),
}
}
}

View File

@@ -37,8 +37,10 @@ const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
let data_type =
ColumnDataTypeWrapper::try_new(column_def.data_type, column_def.datatype_extension)?;
let data_type = ColumnDataTypeWrapper::try_new(
column_def.data_type,
column_def.datatype_extension.clone(),
)?;
let constraint = if column_def.default_constraint.is_empty() {
None

View File

@@ -38,7 +38,7 @@ pub(crate) fn one_of_sigs2(args1: Vec<DataType>, args2: Vec<DataType>) -> Signat
/// Cast a [`ValueRef`] to u64, returns `None` if fails
pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
cast((*value).into(), &ConcreteDataType::uint64_datatype())
cast(value.clone().into(), &ConcreteDataType::uint64_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint64, actual type: {:#?}",
@@ -50,7 +50,7 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
/// Cast a [`ValueRef`] to u32, returns `None` if fails
pub fn cast_u32(value: &ValueRef) -> Result<Option<u32>> {
cast((*value).into(), &ConcreteDataType::uint32_datatype())
cast(value.clone().into(), &ConcreteDataType::uint32_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint32, actual type: {:#?}",

View File

@@ -41,7 +41,7 @@ where
let right: &<R as Scalar>::VectorType = unsafe { Helper::static_cast(right.inner()) };
let b = right.get_data(0);
let it = left.iter_data().map(|a| f(a, b, ctx));
let it = left.iter_data().map(|a| f(a, b.clone(), ctx));
<O as Scalar>::VectorType::from_owned_iterator(it)
}
@@ -62,7 +62,7 @@ where
let a = left.get_data(0);
let right: &<R as Scalar>::VectorType = unsafe { Helper::static_cast(r) };
let it = right.iter_data().map(|b| f(a, b, ctx));
let it = right.iter_data().map(|b| f(a.clone(), b, ctx));
<O as Scalar>::VectorType::from_owned_iterator(it)
}

View File

@@ -46,7 +46,7 @@ pub(crate) fn add_values_to_builder(
Some(true) => builder.push_null(),
_ => {
builder
.try_push_value_ref(values[idx_of_values].as_value_ref())
.try_push_value_ref(&values[idx_of_values].as_value_ref())
.context(CreateVectorSnafu)?;
idx_of_values += 1
}

View File

@@ -167,7 +167,7 @@ pub fn build_create_table_expr(
default_constraint: vec![],
semantic_type,
comment: String::new(),
datatype_extension: *datatype_extension,
datatype_extension: datatype_extension.clone(),
options: options.clone(),
});
}
@@ -209,7 +209,7 @@ pub fn extract_new_columns(
default_constraint: vec![],
semantic_type: expr.semantic_type,
comment: String::new(),
datatype_extension: *expr.datatype_extension,
datatype_extension: expr.datatype_extension.clone(),
options: expr.options.clone(),
});
AddColumn {
@@ -425,7 +425,7 @@ mod tests {
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(
decimal_column.data_type,
decimal_column.datatype_extension,
decimal_column.datatype_extension.clone(),
)
.unwrap()
)
@@ -520,6 +520,7 @@ mod tests {
.as_ref()
.unwrap()
.datatype_extension
.clone()
)
.unwrap()
)

View File

@@ -325,7 +325,7 @@ fn build_struct(
let result = #fn_name(handler, query_ctx, &[]).await
.map_err(|e| datafusion_common::DataFusionError::Execution(format!("Function execution error: {}", e.output_msg())))?;
builder.push_value_ref(result.as_value_ref());
builder.push_value_ref(&result.as_value_ref());
} else {
for i in 0..rows_num {
let args: Vec<_> = columns.iter()
@@ -335,7 +335,7 @@ fn build_struct(
let result = #fn_name(handler, query_ctx, &args).await
.map_err(|e| datafusion_common::DataFusionError::Execution(format!("Function execution error: {}", e.output_msg())))?;
builder.push_value_ref(result.as_value_ref());
builder.push_value_ref(&result.as_value_ref());
}
}

View File

@@ -90,6 +90,24 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::VectorType(VectorTypeExtension { dim: #dim })) })
}
}
Some(TypeExt::ListType(ext)) => {
let item_type = syn::Ident::new(&ext.datatype.to_string(), ident.span());
quote! {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::ListType(ListTypeExtension { item_type: #item_type })) })
}
}
Some(TypeExt::StructType(ext)) => {
let fields = ext.fields.iter().map(|field| {
let field_name = syn::Ident::new(&field.name.to_string(), ident.span());
let field_type = syn::Ident::new(&field.datatype.to_string(), ident.span());
quote! {
StructField { name: #field_name, type_: #field_type }
}
}).collect::<Vec<_>>();
quote! {
Some(ColumnDataTypeExtension { type_ext: Some(TypeExt::StructType(StructTypeExtension { fields: [#(#fields),*] })) })
}
}
None => {
quote! { None }
}

View File

@@ -184,7 +184,7 @@ pub(crate) fn convert_semantic_type_to_proto_semantic_type(
}
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub(crate) struct ColumnDataTypeWithExtension {
pub(crate) data_type: ColumnDataType,
pub(crate) extension: Option<ColumnDataTypeExtension>,
@@ -260,7 +260,10 @@ pub(crate) fn get_column_data_type(
infer_column_data_type: &Option<ColumnDataTypeWithExtension>,
attribute: &ColumnAttribute,
) -> Option<ColumnDataTypeWithExtension> {
attribute.datatype.or(*infer_column_data_type)
attribute
.datatype
.clone()
.or_else(|| infer_column_data_type.clone())
}
/// Convert a column data type to a value data ident.
@@ -304,5 +307,7 @@ pub(crate) fn convert_column_data_type_to_value_data_ident(
// Json is a special case, it is actually a string column.
ColumnDataType::Json => format_ident!("StringValue"),
ColumnDataType::Vector => format_ident!("VectorValue"),
ColumnDataType::List => format_ident!("ListValue"),
ColumnDataType::Struct => format_ident!("StructValue"),
}
}

View File

@@ -90,7 +90,7 @@ pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<Crea
default_constraint: c.default_constraint.clone(),
semantic_type: semantic_type as i32,
comment: String::new(),
datatype_extension: c.datatype_extension,
datatype_extension: c.datatype_extension.clone(),
options: c.options.clone(),
}),
column_id: i as u32,

View File

@@ -220,7 +220,7 @@ pub fn sql_value_to_value(
_ => return InvalidUnaryOpSnafu { unary_op, value }.fail(),
},
Value::String(_) | Value::Binary(_) | Value::List(_) => {
Value::String(_) | Value::Binary(_) | Value::List(_) | Value::Struct(_) => {
return InvalidUnaryOpSnafu { unary_op, value }.fail();
}
}

View File

@@ -285,6 +285,13 @@ impl ConcreteDataType {
}
}
pub fn as_struct(&self) -> Option<&StructType> {
match self {
ConcreteDataType::Struct(s) => Some(s),
_ => None,
}
}
/// Try to cast data type as a [`TimestampType`].
pub fn as_timestamp(&self) -> Option<TimestampType> {
match self {

View File

@@ -220,6 +220,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to convert scalar value to Arrow array"))]
ConvertScalarToArrowArray {
#[snafu(source)]
error: datafusion_common::DataFusionError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse extended type in metadata: {}", value))]
ParseExtendedType {
value: String,
@@ -238,6 +246,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Inconsistent struct field count {field_len} and item count {item_len}"))]
InconsistentStructFieldsAndItems {
field_len: usize,
item_len: usize,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to process JSONB value"))]
InvalidJsonb {
error: jsonb::Error,
@@ -282,7 +297,9 @@ impl ErrorExt for Error {
| ToScalarValue { .. }
| TryFromValue { .. }
| ConvertArrowArrayToScalars { .. }
| ParseExtendedType { .. } => StatusCode::Internal,
| ConvertScalarToArrowArray { .. }
| ParseExtendedType { .. }
| InconsistentStructFieldsAndItems { .. } => StatusCode::Internal,
}
}

View File

@@ -21,10 +21,10 @@ use crate::types::{
Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type,
UInt32Type, UInt64Type,
};
use crate::value::{ListValue, ListValueRef, Value};
use crate::value::{ListValue, ListValueRef, StructValue, StructValueRef, Value};
use crate::vectors::{
BinaryVector, BooleanVector, DateVector, Decimal128Vector, ListVector, MutableVector,
PrimitiveVector, StringVector, Vector,
NullVector, PrimitiveVector, StringVector, StructVector, Vector,
};
fn get_iter_capacity<T, I: Iterator<Item = T>>(iter: &I) -> usize {
@@ -52,7 +52,7 @@ where
fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short>;
}
pub trait ScalarRef<'a>: std::fmt::Debug + Clone + Copy + Send + 'a {
pub trait ScalarRef<'a>: std::fmt::Debug + Clone + Send + 'a {
/// The corresponding [`Scalar`] type.
type ScalarType: Scalar<RefType<'a> = Self>;
@@ -95,7 +95,7 @@ where
fn from_slice(data: &[Self::RefItem<'_>]) -> Self {
let mut builder = Self::Builder::with_capacity(data.len());
for item in data {
builder.push(Some(*item));
builder.push(Some(item.clone()));
}
builder.finish()
}
@@ -152,13 +152,11 @@ macro_rules! impl_scalar_for_native {
type VectorType = PrimitiveVector<$DataType>;
type RefType<'a> = $Native;
#[inline]
fn as_scalar_ref(&self) -> $Native {
*self
}
#[allow(clippy::needless_lifetimes)]
#[inline]
fn upcast_gat<'short, 'long: 'short>(long: $Native) -> $Native {
long
}
@@ -168,7 +166,6 @@ macro_rules! impl_scalar_for_native {
impl<'a> ScalarRef<'a> for $Native {
type ScalarType = $Native;
#[inline]
fn to_owned_scalar(&self) -> $Native {
*self
}
@@ -187,17 +184,33 @@ impl_scalar_for_native!(i64, Int64Type);
impl_scalar_for_native!(f32, Float32Type);
impl_scalar_for_native!(f64, Float64Type);
impl Scalar for () {
type VectorType = NullVector;
type RefType<'a> = ();
fn as_scalar_ref(&self) {}
#[allow(clippy::needless_lifetimes)]
fn upcast_gat<'short, 'long: 'short>(long: ()) {
long
}
}
impl ScalarRef<'_> for () {
type ScalarType = ();
fn to_owned_scalar(&self) {}
}
impl Scalar for bool {
type VectorType = BooleanVector;
type RefType<'a> = bool;
#[inline]
fn as_scalar_ref(&self) -> bool {
*self
}
#[allow(clippy::needless_lifetimes)]
#[inline]
fn upcast_gat<'short, 'long: 'short>(long: bool) -> bool {
long
}
@@ -206,7 +219,6 @@ impl Scalar for bool {
impl ScalarRef<'_> for bool {
type ScalarType = bool;
#[inline]
fn to_owned_scalar(&self) -> bool {
*self
}
@@ -216,12 +228,10 @@ impl Scalar for String {
type VectorType = StringVector;
type RefType<'a> = &'a str;
#[inline]
fn as_scalar_ref(&self) -> &str {
self
}
#[inline]
fn upcast_gat<'short, 'long: 'short>(long: &'long str) -> &'short str {
long
}
@@ -230,7 +240,6 @@ impl Scalar for String {
impl<'a> ScalarRef<'a> for &'a str {
type ScalarType = String;
#[inline]
fn to_owned_scalar(&self) -> String {
self.to_string()
}
@@ -240,12 +249,10 @@ impl Scalar for Vec<u8> {
type VectorType = BinaryVector;
type RefType<'a> = &'a [u8];
#[inline]
fn as_scalar_ref(&self) -> &[u8] {
self
}
#[inline]
fn upcast_gat<'short, 'long: 'short>(long: &'long [u8]) -> &'short [u8] {
long
}
@@ -254,7 +261,6 @@ impl Scalar for Vec<u8> {
impl<'a> ScalarRef<'a> for &'a [u8] {
type ScalarType = Vec<u8>;
#[inline]
fn to_owned_scalar(&self) -> Vec<u8> {
self.to_vec()
}
@@ -332,6 +338,42 @@ impl<'a> ScalarRef<'a> for ListValueRef<'a> {
_ => unreachable!(),
},
ListValueRef::Ref { val } => (*val).clone(),
ListValueRef::RefList { val, item_datatype } => ListValue::new(
val.iter().map(|v| Value::from(v.clone())).collect(),
item_datatype.clone(),
),
}
}
}
impl Scalar for StructValue {
type VectorType = StructVector;
type RefType<'a> = StructValueRef<'a>;
fn as_scalar_ref(&self) -> Self::RefType<'_> {
StructValueRef::Ref(self)
}
fn upcast_gat<'short, 'long: 'short>(long: Self::RefType<'long>) -> Self::RefType<'short> {
long
}
}
impl<'a> ScalarRef<'a> for StructValueRef<'a> {
type ScalarType = StructValue;
fn to_owned_scalar(&self) -> Self::ScalarType {
match self {
Self::Indexed { vector, idx } => match vector.get(*idx) {
Value::Null => StructValue::default(),
Value::Struct(v) => v,
_ => unreachable!(),
},
StructValueRef::Ref(val) => (*val).clone(),
StructValueRef::RefList { val, fields } => {
let items = val.iter().map(|v| Value::from(v.clone())).collect();
StructValue::try_new(items, fields.clone()).unwrap()
}
}
}
}
@@ -346,7 +388,7 @@ mod tests {
fn build_vector_from_slice<T: ScalarVector>(items: &[Option<T::RefItem<'_>>]) -> T {
let mut builder = T::Builder::with_capacity(items.len());
for item in items {
builder.push(*item);
builder.push(item.clone());
}
builder.finish()
}

View File

@@ -296,7 +296,7 @@ impl ColumnSchema {
let value_ref = padding_value.as_value_ref();
let mut mutable_vector = self.data_type.create_mutable_vector(num_rows);
for _ in 0..num_rows {
mutable_vector.push_value_ref(value_ref);
mutable_vector.push_value_ref(&value_ref);
}
mutable_vector.to_vector()
}

View File

@@ -157,7 +157,7 @@ impl ColumnDefaultConstraint {
// attempt to downcast the vector fail if they don't check whether the vector is const
// first.
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector.try_push_value_ref(v.as_value_ref())?;
mutable_vector.try_push_value_ref(&v.as_value_ref())?;
let base_vector = mutable_vector.to_vector();
Ok(base_vector.replicate(&[num_rows]))
}

View File

@@ -83,10 +83,10 @@ impl LogicalPrimitiveType for DateType {
})
}
fn cast_value_ref(value: ValueRef) -> Result<Option<Date>> {
fn cast_value_ref(value: &ValueRef) -> Result<Option<Date>> {
match value {
ValueRef::Null => Ok(None),
ValueRef::Date(v) => Ok(Some(v)),
ValueRef::Date(v) => Ok(Some(*v)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value {other:?} to Date"),
}

View File

@@ -133,11 +133,11 @@ macro_rules! impl_data_type_for_duration {
})
}
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
fn cast_value_ref(value: &ValueRef) -> crate::Result<Option<Self::Wrapper>> {
match value {
ValueRef::Null => Ok(None),
ValueRef::Duration(t) => match t.unit() {
TimeUnit::$unit => Ok(Some([<Duration $unit>](t))),
TimeUnit::$unit => Ok(Some([<Duration $unit>](*t))),
other => error::CastTypeSnafu {
msg: format!(
"Failed to cast Duration value with different unit {:?} to {}",

View File

@@ -123,10 +123,10 @@ macro_rules! impl_data_type_for_interval {
})
}
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
fn cast_value_ref(value: &ValueRef) -> crate::Result<Option<Self::Wrapper>> {
match value {
ValueRef::Null => Ok(None),
ValueRef::[<Interval $unit>](t) => Ok(Some(t)),
ValueRef::[<Interval $unit>](t) => Ok(Some(*t)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast value {:?} to {}", other, stringify!([<Interval $unit>])),
}

View File

@@ -65,7 +65,11 @@ impl DataType for ListType {
}
fn as_arrow_type(&self) -> ArrowDataType {
let field = Arc::new(Field::new("item", self.item_type.as_arrow_type(), true));
let field = Arc::new(Field::new(
Field::LIST_FIELD_DEFAULT_NAME,
self.item_type.as_arrow_type(),
true,
));
ArrowDataType::List(field)
}

View File

@@ -80,7 +80,7 @@ pub trait LogicalPrimitiveType: 'static + Sized {
fn cast_vector(vector: &dyn Vector) -> Result<&PrimitiveVector<Self>>;
/// Cast value ref to the primitive type.
fn cast_value_ref(value: ValueRef) -> Result<Option<Self::Wrapper>>;
fn cast_value_ref(value: &ValueRef) -> Result<Option<Self::Wrapper>>;
}
/// A new type for [WrapperType], complement the `Ord` feature for it.
@@ -194,10 +194,10 @@ macro_rules! define_logical_primitive_type {
})
}
fn cast_value_ref(value: ValueRef) -> Result<Option<$Native>> {
fn cast_value_ref(value: &ValueRef) -> Result<Option<$Native>> {
match value {
ValueRef::Null => Ok(None),
ValueRef::$TypeId(v) => Ok(Some(v.into())),
ValueRef::$TypeId(v) => Ok(Some((*v).into())),
other => error::CastTypeSnafu {
msg: format!(
"Failed to cast value {:?} to primitive type {}",

View File

@@ -89,8 +89,8 @@ impl DataType for StringType {
Value::Duration(v) => Some(Value::String(StringBytes::from(v.to_string()))),
Value::Decimal128(v) => Some(Value::String(StringBytes::from(v.to_string()))),
// StringBytes is only support for utf-8, Value::Binary is not allowed.
Value::Binary(_) | Value::List(_) => None,
// StringBytes is only support for utf-8, Value::Binary and collections are not allowed.
Value::Binary(_) | Value::List(_) | Value::Struct(_) => None,
}
}
}

View File

@@ -18,6 +18,7 @@ use serde::{Deserialize, Serialize};
use crate::prelude::{ConcreteDataType, DataType, LogicalTypeId};
use crate::value::Value;
use crate::vectors::StructVectorBuilder;
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct StructType {
@@ -68,16 +69,15 @@ impl DataType for StructType {
}
fn as_arrow_type(&self) -> ArrowDataType {
let fields = self
.fields
.iter()
.map(|f| Field::new(f.name.clone(), f.data_type.as_arrow_type(), f.nullable))
.collect();
let fields = self.as_arrow_fields();
ArrowDataType::Struct(fields)
}
fn create_mutable_vector(&self, _capacity: usize) -> Box<dyn crate::prelude::MutableVector> {
unimplemented!("What is the mutable vector for StructVector?");
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn crate::prelude::MutableVector> {
Box::new(StructVectorBuilder::with_type_and_capacity(
self.clone(),
capacity,
))
}
fn try_cast(&self, _from: Value) -> Option<Value> {
@@ -94,6 +94,13 @@ impl StructType {
pub fn fields(&self) -> &[StructField] {
&self.fields
}
pub fn as_arrow_fields(&self) -> Fields {
self.fields
.iter()
.map(|f| Field::new(f.name.clone(), f.data_type.as_arrow_type(), f.nullable))
.collect()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]

View File

@@ -149,14 +149,14 @@ macro_rules! impl_data_type_for_time {
})
}
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
fn cast_value_ref(value: &ValueRef) -> crate::Result<Option<Self::Wrapper>> {
match value {
ValueRef::Null => Ok(None),
ValueRef::Int64(v) =>{
Ok(Some([<Time $unit>]::from(v)))
Ok(Some([<Time $unit>]::from(*v)))
}
ValueRef::Time(t) => match t.unit() {
TimeUnit::$unit => Ok(Some([<Time $unit>](t))),
TimeUnit::$unit => Ok(Some([<Time $unit>](*t))),
other => error::CastTypeSnafu {
msg: format!(
"Failed to cast Time value with different unit {:?} to {}",

View File

@@ -166,14 +166,14 @@ macro_rules! impl_data_type_for_timestamp {
})
}
fn cast_value_ref(value: ValueRef) -> crate::Result<Option<Self::Wrapper>> {
fn cast_value_ref(value: &ValueRef) -> crate::Result<Option<Self::Wrapper>> {
match value {
ValueRef::Null => Ok(None),
ValueRef::Int64(v) =>{
Ok(Some([<Timestamp $unit>]::from(v)))
Ok(Some([<Timestamp $unit>]::from(*v)))
}
ValueRef::Timestamp(t) => match t.unit() {
TimeUnit::$unit => Ok(Some([<Timestamp $unit>](t))),
TimeUnit::$unit => Ok(Some([<Timestamp $unit>](*t))),
other => error::CastTypeSnafu {
msg: format!(
"Failed to cast Timestamp value with different unit {:?} to {}",

View File

@@ -16,8 +16,7 @@ use std::cmp::Ordering;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow_array::{Array, ListArray};
use arrow_array::{Array, StructArray};
use common_base::bytes::{Bytes, StringBytes};
use common_decimal::Decimal128;
use common_telemetry::error;
@@ -30,13 +29,17 @@ use datafusion_common::ScalarValue;
use datafusion_common::scalar::ScalarStructBuilder;
pub use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize, Serializer};
use serde_json::Map;
use snafu::{ResultExt, ensure};
use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu};
use crate::error::{
self, ConvertArrowArrayToScalarsSnafu, ConvertScalarToArrowArraySnafu, Error,
InconsistentStructFieldsAndItemsSnafu, Result, TryFromValueSnafu,
};
use crate::prelude::*;
use crate::type_id::LogicalTypeId;
use crate::types::{IntervalType, ListType};
use crate::vectors::ListVector;
use crate::types::{IntervalType, ListType, StructType};
use crate::vectors::{ListVector, StructVector};
pub type OrderedF32 = OrderedFloat<f32>;
pub type OrderedF64 = OrderedFloat<f64>;
@@ -79,6 +82,7 @@ pub enum Value {
IntervalMonthDayNano(IntervalMonthDayNano),
List(ListValue),
Struct(StructValue),
}
impl Display for Value {
@@ -128,6 +132,18 @@ impl Display for Value {
write!(f, "{}[{}]", v.datatype.name(), items)
}
Value::Decimal128(v) => write!(f, "{}", v),
Value::Struct(s) => {
let items = s
.fields
.fields()
.iter()
.map(|f| f.name())
.zip(s.items().iter())
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<String>>()
.join(", ");
write!(f, "{{ {items} }}")
}
}
}
}
@@ -171,6 +187,9 @@ macro_rules! define_data_type_func {
$struct::Decimal128(d) => {
ConcreteDataType::decimal128_datatype(d.precision(), d.scale())
}
$struct::Struct(struct_value) => {
ConcreteDataType::struct_datatype(struct_value.struct_type().clone())
}
}
}
};
@@ -196,6 +215,17 @@ impl Value {
}
}
pub fn as_struct(&self) -> Result<Option<&StructValue>> {
match self {
Value::Null => Ok(None),
Value::Struct(v) => Ok(Some(v)),
other => error::CastTypeSnafu {
msg: format!("Failed to cast {other:?} to struct value"),
}
.fail(),
}
}
/// Cast itself to [ValueRef].
pub fn as_value_ref(&self) -> ValueRef {
match self {
@@ -222,6 +252,7 @@ impl Value {
Value::IntervalMonthDayNano(v) => ValueRef::IntervalMonthDayNano(*v),
Value::Duration(v) => ValueRef::Duration(*v),
Value::Decimal128(v) => ValueRef::Decimal128(*v),
Value::Struct(v) => ValueRef::Struct(StructValueRef::Ref(v)),
}
}
@@ -379,6 +410,7 @@ impl Value {
TimeUnit::Nanosecond => LogicalTypeId::DurationNanosecond,
},
Value::Decimal128(_) => LogicalTypeId::Decimal128,
Value::Struct(_) => LogicalTypeId::Struct,
}
}
@@ -431,6 +463,10 @@ impl Value {
let (v, p, s) = d.to_scalar_value();
ScalarValue::Decimal128(v, p, s)
}
Value::Struct(struct_value) => {
let struct_type = output_type.as_struct().unwrap();
struct_value.try_to_scalar_value(struct_type)?
}
};
Ok(scalar_value)
@@ -483,7 +519,11 @@ impl Value {
Value::IntervalDayTime(x) => Some(Value::IntervalDayTime(x.negative())),
Value::IntervalMonthDayNano(x) => Some(Value::IntervalMonthDayNano(x.negative())),
Value::Binary(_) | Value::String(_) | Value::Boolean(_) | Value::List(_) => None,
Value::Binary(_)
| Value::String(_)
| Value::Boolean(_)
| Value::List(_)
| Value::Struct(_) => None,
}
}
}
@@ -543,16 +583,11 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
IntervalType::DayTime(_) => ScalarValue::IntervalDayTime(None),
IntervalType::MonthDayNano(_) => ScalarValue::IntervalMonthDayNano(None),
},
ConcreteDataType::List(_) => ScalarValue::List(Arc::new(ListArray::new_null(
Arc::new(new_item_field(output_type.as_arrow_type())),
0,
))),
ConcreteDataType::List(list_type) => {
ScalarValue::new_null_list(list_type.item_type().as_arrow_type(), true, 1)
}
ConcreteDataType::Struct(fields) => {
let fields = fields
.fields()
.iter()
.map(|f| f.to_df_field())
.collect::<Vec<_>>();
let fields = fields.as_arrow_fields();
ScalarStructBuilder::new_null(fields)
}
ConcreteDataType::Dictionary(dict) => ScalarValue::Dictionary(
@@ -565,10 +600,6 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
})
}
fn new_item_field(data_type: ArrowDataType) -> Field {
Field::new("item", data_type, false)
}
pub fn timestamp_to_scalar_value(unit: TimeUnit, val: Option<i64>) -> ScalarValue {
match unit {
TimeUnit::Second => ScalarValue::TimestampSecond(val, None),
@@ -819,7 +850,14 @@ impl TryFrom<Value> for serde_json::Value {
Value::String(bytes) => serde_json::Value::String(bytes.into_string()),
Value::Binary(bytes) => serde_json::to_value(bytes)?,
Value::Date(v) => serde_json::Value::Number(v.val().into()),
Value::List(v) => serde_json::to_value(v)?,
Value::List(v) => {
let items = v
.take_items()
.into_iter()
.map(serde_json::Value::try_from)
.collect::<serde_json::Result<Vec<_>>>()?;
serde_json::Value::Array(items)
}
Value::Timestamp(v) => serde_json::to_value(v.value())?,
Value::Time(v) => serde_json::to_value(v.value())?,
Value::IntervalYearMonth(v) => serde_json::to_value(v.to_i32())?,
@@ -827,6 +865,22 @@ impl TryFrom<Value> for serde_json::Value {
Value::IntervalMonthDayNano(v) => serde_json::to_value(v.to_i128())?,
Value::Duration(v) => serde_json::to_value(v.value())?,
Value::Decimal128(v) => serde_json::to_value(v.to_string())?,
Value::Struct(v) => {
let map = v
.fields
.clone() // TODO:(sunng87) remove in next patch when into_parts is merged
.fields()
.iter()
.zip(v.take_items().into_iter())
.map(|(field, value)| {
Ok((
field.name().to_string(),
serde_json::Value::try_from(value)?,
))
})
.collect::<serde_json::Result<Map<String, serde_json::Value>>>()?;
serde_json::Value::Object(map)
}
};
Ok(json_value)
@@ -854,6 +908,10 @@ impl ListValue {
&self.items
}
pub fn take_items(self) -> Vec<Value> {
self.items
}
pub fn datatype(&self) -> &ConcreteDataType {
&self.datatype
}
@@ -903,6 +961,67 @@ impl Ord for ListValue {
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct StructValue {
items: Vec<Value>,
fields: StructType,
}
impl StructValue {
pub fn try_new(items: Vec<Value>, fields: StructType) -> Result<Self> {
ensure!(
items.len() == fields.fields().len(),
InconsistentStructFieldsAndItemsSnafu {
field_len: fields.fields().len(),
item_len: items.len()
}
);
Ok(Self { items, fields })
}
pub fn items(&self) -> &[Value] {
&self.items
}
pub fn take_items(self) -> Vec<Value> {
self.items
}
pub fn struct_type(&self) -> &StructType {
&self.fields
}
fn estimated_size(&self) -> usize {
self.items
.iter()
.map(|x| x.as_value_ref().data_size())
.sum()
}
fn try_to_scalar_value(&self, output_type: &StructType) -> Result<ScalarValue> {
let arrays = self
.items
.iter()
.map(|value| {
let scalar_value = value.try_to_scalar_value(&value.data_type())?;
scalar_value
.to_array()
.context(ConvertScalarToArrowArraySnafu)
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
let fields = output_type.as_arrow_fields();
let struct_array = StructArray::new(fields, arrays, None);
Ok(ScalarValue::Struct(Arc::new(struct_array)))
}
}
impl Default for StructValue {
fn default() -> StructValue {
StructValue::try_new(vec![], StructType::new(vec![])).unwrap()
}
}
// TODO(ruihang): Implement this type
/// Dictionary value.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
@@ -938,7 +1057,8 @@ impl TryFrom<ScalarValue> for Value {
| ScalarValue::LargeBinary(b)
| ScalarValue::FixedSizeBinary(_, b) => Value::from(b.map(Bytes::from)),
ScalarValue::List(array) => {
let datatype = ConcreteDataType::try_from(array.data_type())?;
// this is for item type
let datatype = ConcreteDataType::try_from(&array.value_type())?;
let items = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?
.into_iter()
@@ -997,8 +1117,21 @@ impl TryFrom<ScalarValue> for Value {
ScalarValue::Decimal128(v, p, s) => v
.map(|v| Value::Decimal128(Decimal128::new(v, p, s)))
.unwrap_or(Value::Null),
ScalarValue::Struct(struct_array) => {
let struct_type: StructType = (struct_array.fields()).try_into()?;
let items = struct_array
.columns()
.iter()
.map(|array| {
// we only take first element from each array
let field_scalar_value = ScalarValue::try_from_array(array.as_ref(), 0)
.context(ConvertArrowArrayToScalarsSnafu)?;
field_scalar_value.try_into()
})
.collect::<Result<Vec<Value>>>()?;
Value::Struct(StructValue::try_new(items, struct_type)?)
}
ScalarValue::Decimal256(_, _, _)
| ScalarValue::Struct(_)
| ScalarValue::FixedSizeList(_)
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
@@ -1044,12 +1177,13 @@ impl From<ValueRef<'_>> for Value {
ValueRef::Duration(v) => Value::Duration(v),
ValueRef::List(v) => v.to_value(),
ValueRef::Decimal128(v) => Value::Decimal128(v),
ValueRef::Struct(v) => v.to_value(),
}
}
}
/// Reference to [Value].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum ValueRef<'a> {
Null,
@@ -1085,13 +1219,14 @@ pub enum ValueRef<'a> {
// Compound types:
List(ListValueRef<'a>),
Struct(StructValueRef<'a>),
}
macro_rules! impl_as_for_value_ref {
($value: ident, $Variant: ident) => {
match $value {
ValueRef::Null => Ok(None),
ValueRef::$Variant(v) => Ok(Some(*v)),
ValueRef::$Variant(v) => Ok(Some(v.clone())),
other => error::CastTypeSnafu {
msg: format!(
"Failed to cast value ref {:?} to {}",
@@ -1220,6 +1355,10 @@ impl<'a> ValueRef<'a> {
impl_as_for_value_ref!(self, List)
}
pub fn as_struct(&self) -> Result<Option<StructValueRef>> {
impl_as_for_value_ref!(self, Struct)
}
/// Cast itself to [Decimal128].
pub fn as_decimal128(&self) -> Result<Option<Decimal128>> {
impl_as_for_value_ref!(self, Decimal128)
@@ -1303,27 +1442,40 @@ impl<'a> From<Option<ListValueRef<'a>>> for ValueRef<'a> {
/// Now comparison still requires some allocation (call of `to_value()`) and
/// might be avoidable by downcasting and comparing the underlying array slice
/// if it becomes bottleneck.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub enum ListValueRef<'a> {
// TODO(yingwen): Consider replace this by VectorRef.
Indexed { vector: &'a ListVector, idx: usize },
Ref { val: &'a ListValue },
Indexed {
vector: &'a ListVector,
idx: usize,
},
Ref {
val: &'a ListValue,
},
RefList {
val: Vec<ValueRef<'a>>,
item_datatype: ConcreteDataType,
},
}
impl ListValueRef<'_> {
/// Convert self to [Value]. This method would clone the underlying data.
fn to_value(self) -> Value {
fn to_value(&self) -> Value {
match self {
ListValueRef::Indexed { vector, idx } => vector.get(idx),
ListValueRef::Ref { val } => Value::List(val.clone()),
ListValueRef::Indexed { vector, idx } => vector.get(*idx),
ListValueRef::Ref { val } => Value::List((*val).clone()),
ListValueRef::RefList { val, item_datatype } => Value::List(ListValue::new(
val.iter().map(|v| Value::from(v.clone())).collect(),
item_datatype.clone(),
)),
}
}
/// Returns the inner element's data type.
fn datatype(&self) -> ConcreteDataType {
match self {
ListValueRef::Indexed { vector, .. } => vector.data_type(),
ListValueRef::Ref { val } => val.datatype().clone(),
ListValueRef::RefList { item_datatype, .. } => item_datatype.clone(),
}
}
}
@@ -1336,6 +1488,7 @@ impl Serialize for ListValueRef<'_> {
_ => unreachable!(),
},
ListValueRef::Ref { val } => val.serialize(serializer),
ListValueRef::RefList { val, .. } => val.serialize(serializer),
}
}
}
@@ -1361,11 +1514,79 @@ impl PartialOrd for ListValueRef<'_> {
}
}
#[derive(Debug, Clone)]
pub enum StructValueRef<'a> {
Indexed {
vector: &'a StructVector,
idx: usize,
},
Ref(&'a StructValue),
RefList {
val: Vec<ValueRef<'a>>,
fields: StructType,
},
}
impl<'a> StructValueRef<'a> {
pub fn to_value(&self) -> Value {
match self {
StructValueRef::Indexed { vector, idx } => vector.get(*idx),
StructValueRef::Ref(val) => Value::Struct((*val).clone()),
StructValueRef::RefList { val, fields } => {
let items = val.iter().map(|v| Value::from(v.clone())).collect();
Value::Struct(StructValue::try_new(items, fields.clone()).unwrap())
}
}
}
pub fn struct_type(&self) -> &StructType {
match self {
StructValueRef::Indexed { vector, .. } => vector.struct_type(),
StructValueRef::Ref(val) => val.struct_type(),
StructValueRef::RefList { fields, .. } => fields,
}
}
}
impl Serialize for StructValueRef<'_> {
fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
match self {
StructValueRef::Indexed { vector, idx } => match vector.get(*idx) {
Value::Struct(v) => v.serialize(serializer),
_ => unreachable!(),
},
StructValueRef::Ref(val) => val.serialize(serializer),
StructValueRef::RefList { val, .. } => val.serialize(serializer),
}
}
}
impl PartialEq for StructValueRef<'_> {
fn eq(&self, other: &Self) -> bool {
self.to_value().eq(&other.to_value())
}
}
impl Eq for StructValueRef<'_> {}
impl Ord for StructValueRef<'_> {
fn cmp(&self, other: &Self) -> Ordering {
// Respect the order of `Value` by converting into value before comparison.
self.to_value().cmp(&other.to_value())
}
}
impl PartialOrd for StructValueRef<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl ValueRef<'_> {
/// Returns the size of the underlying data in bytes,
/// The size is estimated and only considers the data size.
pub fn data_size(&self) -> usize {
match *self {
match self {
// Since the `Null` type is also considered to occupy space, we have opted to use the
// size of `i64` as an initial approximation.
ValueRef::Null => 8,
@@ -1380,8 +1601,8 @@ impl ValueRef<'_> {
ValueRef::Int64(_) => 8,
ValueRef::Float32(_) => 4,
ValueRef::Float64(_) => 8,
ValueRef::String(v) => std::mem::size_of_val(v),
ValueRef::Binary(v) => std::mem::size_of_val(v),
ValueRef::String(v) => std::mem::size_of_val(*v),
ValueRef::Binary(v) => std::mem::size_of_val(*v),
ValueRef::Date(_) => 4,
ValueRef::Timestamp(_) => 16,
ValueRef::Time(_) => 16,
@@ -1393,20 +1614,81 @@ impl ValueRef<'_> {
ValueRef::List(v) => match v {
ListValueRef::Indexed { vector, .. } => vector.memory_size() / vector.len(),
ListValueRef::Ref { val } => val.estimated_size(),
ListValueRef::RefList { val, .. } => val.iter().map(|v| v.data_size()).sum(),
},
ValueRef::Struct(val) => match val {
StructValueRef::Indexed { vector, .. } => vector.memory_size() / vector.len(),
StructValueRef::Ref(val) => val.estimated_size(),
StructValueRef::RefList { val, .. } => val.iter().map(|v| v.data_size()).sum(),
},
}
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::DataType as ArrowDataType;
pub(crate) mod tests {
use arrow::datatypes::{DataType as ArrowDataType, Field};
use common_time::timezone::set_default_timezone;
use num_traits::Float;
use super::*;
use crate::types::StructField;
use crate::vectors::ListVectorBuilder;
pub(crate) fn build_struct_type() -> StructType {
StructType::new(vec![
StructField::new("id".to_string(), ConcreteDataType::int32_datatype(), false),
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::uint8_datatype(), true),
StructField::new(
"address".to_string(),
ConcreteDataType::string_datatype(),
true,
),
])
}
pub(crate) fn build_struct_value() -> StructValue {
let struct_type = build_struct_type();
let struct_items = vec![
Value::Int32(1),
Value::String("tom".into()),
Value::UInt8(25),
Value::String("94038".into()),
];
StructValue::try_new(struct_items, struct_type).unwrap()
}
pub(crate) fn build_scalar_struct_value() -> ScalarValue {
let struct_type = build_struct_type();
let arrays = vec![
ScalarValue::Int32(Some(1)).to_array().unwrap(),
ScalarValue::Utf8(Some("tom".into())).to_array().unwrap(),
ScalarValue::UInt8(Some(25)).to_array().unwrap(),
ScalarValue::Utf8(Some("94038".into())).to_array().unwrap(),
];
let struct_arrow_array = StructArray::new(struct_type.as_arrow_fields(), arrays, None);
ScalarValue::Struct(Arc::new(struct_arrow_array))
}
pub(crate) fn build_list_value() -> ListValue {
let items = vec![Value::Boolean(true), Value::Boolean(false)];
ListValue::new(items, ConcreteDataType::boolean_datatype())
}
pub(crate) fn build_scalar_list_value() -> ScalarValue {
let items = vec![
ScalarValue::Boolean(Some(true)),
ScalarValue::Boolean(Some(false)),
];
ScalarValue::List(ScalarValue::new_list(&items, &ArrowDataType::Boolean, true))
}
#[test]
fn test_try_from_scalar_value() {
assert_eq!(
@@ -1518,23 +1800,11 @@ mod tests {
);
assert_eq!(
Value::List(ListValue::new(
vec![Value::Int32(1), Value::Null],
ConcreteDataType::list_datatype(ConcreteDataType::int32_datatype())
)),
ScalarValue::List(ScalarValue::new_list(
&[ScalarValue::Int32(Some(1)), ScalarValue::Int32(None)],
&ArrowDataType::Int32,
true,
))
.try_into()
.unwrap()
Value::List(build_list_value()),
build_scalar_list_value().try_into().unwrap()
);
assert_eq!(
Value::List(ListValue::new(
vec![],
ConcreteDataType::list_datatype(ConcreteDataType::uint32_datatype())
)),
Value::List(ListValue::new(vec![], ConcreteDataType::uint32_datatype())),
ScalarValue::List(ScalarValue::new_list(&[], &ArrowDataType::UInt32, true))
.try_into()
.unwrap()
@@ -1690,6 +1960,13 @@ mod tests {
Value::Null,
ScalarValue::Decimal128(None, 0, 0).try_into().unwrap()
);
let struct_value = build_struct_value();
let scalar_struct_value = build_scalar_struct_value();
assert_eq!(
Value::Struct(struct_value),
scalar_struct_value.try_into().unwrap()
);
}
#[test]
@@ -1861,6 +2138,19 @@ mod tests {
&ConcreteDataType::decimal128_datatype(38, 10),
&Value::Decimal128(Decimal128::new(1, 38, 10)),
);
check_type_and_value(
&ConcreteDataType::list_datatype(ConcreteDataType::boolean_datatype()),
&Value::List(ListValue::new(
vec![Value::Boolean(true)],
ConcreteDataType::boolean_datatype(),
)),
);
check_type_and_value(
&ConcreteDataType::struct_datatype(build_struct_type()),
&Value::Struct(build_struct_value()),
);
}
#[test]
@@ -1960,8 +2250,7 @@ mod tests {
to_json(Value::Duration(Duration::new_millisecond(1)))
);
let json_value: serde_json::Value =
serde_json::from_str(r#"{"items":[{"Int32":123}],"datatype":{"Int32":{}}}"#).unwrap();
let json_value: serde_json::Value = serde_json::from_str(r#"[123]"#).unwrap();
assert_eq!(
json_value,
to_json(Value::List(ListValue {
@@ -1969,6 +2258,36 @@ mod tests {
datatype: ConcreteDataType::int32_datatype(),
}))
);
let struct_value = StructValue::try_new(
vec![
Value::Int64(42),
Value::String("tomcat".into()),
Value::Boolean(true),
],
StructType::new(vec![
StructField::new("num".to_string(), ConcreteDataType::int64_datatype(), true),
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new(
"yes_or_no".to_string(),
ConcreteDataType::boolean_datatype(),
true,
),
]),
)
.unwrap();
assert_eq!(
serde_json::Value::try_from(Value::Struct(struct_value)).unwrap(),
serde_json::json!({
"num": 42,
"name": "tomcat",
"yes_or_no": true
})
);
}
#[test]
@@ -2033,10 +2352,7 @@ mod tests {
check_as_value_ref!(Date, Date::new(103));
let list = ListValue {
items: vec![],
datatype: ConcreteDataType::int32_datatype(),
};
let list = build_list_value();
assert_eq!(
ValueRef::List(ListValueRef::Ref { val: &list }),
Value::List(list.clone()).as_value_ref()
@@ -2049,6 +2365,12 @@ mod tests {
ValueRef::Binary(jsonb_value.clone().as_slice()),
Value::Binary(jsonb_value.into()).as_value_ref()
);
let struct_value = build_struct_value();
assert_eq!(
ValueRef::Struct(StructValueRef::Ref(&struct_value)),
Value::Struct(struct_value.clone()).as_value_ref()
);
}
#[test]
@@ -2077,12 +2399,13 @@ mod tests {
check_as_correct!(Date::new(123), Date, as_date);
check_as_correct!(Time::new_second(12), Time, as_time);
check_as_correct!(Duration::new_second(12), Duration, as_duration);
let list = ListValue {
items: vec![],
datatype: ConcreteDataType::int32_datatype(),
};
let list = build_list_value();
check_as_correct!(ListValueRef::Ref { val: &list }, List, as_list);
let struct_value = build_struct_value();
check_as_correct!(StructValueRef::Ref(&struct_value), Struct, as_struct);
let wrong_value = ValueRef::Int32(12345);
assert!(wrong_value.as_binary().is_err());
assert!(wrong_value.as_string().is_err());
@@ -2128,12 +2451,8 @@ mod tests {
"1000ms"
);
assert_eq!(
Value::List(ListValue::new(
vec![Value::Int8(1), Value::Int8(2)],
ConcreteDataType::int8_datatype(),
))
.to_string(),
"Int8[1, 2]"
Value::List(build_list_value()).to_string(),
"Boolean[true, false]"
);
assert_eq!(
Value::List(ListValue::new(
@@ -2167,6 +2486,11 @@ mod tests {
.to_string(),
"TimestampNanosecond[]"
);
assert_eq!(
Value::Struct(build_struct_value()).to_string(),
"{ id: 1, name: tom, age: 25, address: 94038 }"
);
}
#[test]
@@ -2265,6 +2589,22 @@ mod tests {
.try_to_scalar_value(&ConcreteDataType::json_datatype())
.unwrap()
);
assert_eq!(
build_scalar_struct_value(),
Value::Struct(build_struct_value())
.try_to_scalar_value(&ConcreteDataType::struct_datatype(build_struct_type()))
.unwrap()
);
assert_eq!(
build_scalar_list_value(),
Value::List(build_list_value())
.try_to_scalar_value(&ConcreteDataType::list_datatype(
ConcreteDataType::boolean_datatype()
))
.unwrap()
);
}
#[test]
@@ -2403,6 +2743,22 @@ mod tests {
.try_to_scalar_value(&ConcreteDataType::json_datatype())
.unwrap()
);
assert_eq!(
ScalarValue::new_null_list(ArrowDataType::Boolean, true, 1),
Value::Null
.try_to_scalar_value(&ConcreteDataType::list_datatype(
ConcreteDataType::boolean_datatype(),
))
.unwrap()
);
assert_eq!(
ScalarStructBuilder::new_null(build_struct_type().as_arrow_fields()),
Value::Null
.try_to_scalar_value(&ConcreteDataType::struct_datatype(build_struct_type()))
.unwrap()
);
}
#[test]
@@ -2439,6 +2795,41 @@ mod tests {
}
}
#[test]
fn test_struct_value_to_scalar_value() {
let struct_value = build_struct_value();
let scalar_value = struct_value
.try_to_scalar_value(&build_struct_type())
.unwrap();
assert_eq!(scalar_value, build_scalar_struct_value());
assert!(matches!(scalar_value, ScalarValue::Struct(_)));
match scalar_value {
ScalarValue::Struct(values) => {
assert_eq!(&build_struct_type().as_arrow_fields(), values.fields());
assert_eq!(
ScalarValue::try_from_array(values.column(0), 0).unwrap(),
ScalarValue::Int32(Some(1))
);
assert_eq!(
ScalarValue::try_from_array(values.column(1), 0).unwrap(),
ScalarValue::Utf8(Some("tom".into()))
);
assert_eq!(
ScalarValue::try_from_array(values.column(2), 0).unwrap(),
ScalarValue::UInt8(Some(25))
);
assert_eq!(
ScalarValue::try_from_array(values.column(3), 0).unwrap(),
ScalarValue::Utf8(Some("94038".into()))
);
}
_ => panic!("Unexpected value type"),
}
}
#[test]
fn test_timestamp_to_scalar_value() {
assert_eq!(
@@ -2582,7 +2973,12 @@ mod tests {
}),
74,
);
check_value_ref_size_eq(&ValueRef::Decimal128(Decimal128::new(1234, 3, 1)), 32)
check_value_ref_size_eq(&ValueRef::Decimal128(Decimal128::new(1234, 3, 1)), 32);
check_value_ref_size_eq(
&ValueRef::Struct(StructValueRef::Ref(&build_struct_value())),
13,
);
}
#[test]

View File

@@ -71,6 +71,7 @@ pub use primitive::{
UInt32VectorBuilder, UInt64Vector, UInt64VectorBuilder,
};
pub use string::{StringVector, StringVectorBuilder};
pub use struct_vector::{StructVector, StructVectorBuilder};
pub use time::{
TimeMicrosecondVector, TimeMicrosecondVectorBuilder, TimeMillisecondVector,
TimeMillisecondVectorBuilder, TimeNanosecondVector, TimeNanosecondVectorBuilder,
@@ -196,13 +197,13 @@ pub trait MutableVector: Send + Sync {
fn to_vector_cloned(&self) -> VectorRef;
/// Try to push value ref to this mutable vector.
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()>;
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()>;
/// Push value ref to this mutable vector.
///
/// # Panics
/// Panics if error if data types mismatch.
fn push_value_ref(&mut self, value: ValueRef) {
fn push_value_ref(&mut self, value: &ValueRef) {
self.try_push_value_ref(value).unwrap_or_else(|_| {
panic!(
"expecting pushing value of datatype {:?}, actual {:?}",
@@ -439,9 +440,9 @@ pub mod tests {
fn test_mutable_vector_to_vector_cloned() {
// create a string vector builder
let mut builder = ConcreteDataType::string_datatype().create_mutable_vector(1024);
builder.push_value_ref(ValueRef::String("hello"));
builder.push_value_ref(ValueRef::String("world"));
builder.push_value_ref(ValueRef::String("!"));
builder.push_value_ref(&ValueRef::String("hello"));
builder.push_value_ref(&ValueRef::String("world"));
builder.push_value_ref(&ValueRef::String("!"));
// use MutableVector trait to_vector_cloned won't reset builder
let vector = builder.to_vector_cloned();

View File

@@ -241,7 +241,7 @@ impl MutableVector for BinaryVectorBuilder {
Arc::new(self.finish_cloned())
}
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
match value.as_binary()? {
Some(v) => self.mutable_array.append_value(v),
None => self.mutable_array.append_null(),
@@ -427,8 +427,8 @@ mod tests {
let input = BinaryVector::from_slice(&[b"world", b"one", b"two"]);
let mut builder = BinaryType.create_mutable_vector(3);
builder.push_value_ref(ValueRef::Binary("hello".as_bytes()));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.push_value_ref(&ValueRef::Binary("hello".as_bytes()));
assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
builder.extend_slice_of(&input, 1, 2).unwrap();
assert!(
builder

View File

@@ -179,7 +179,7 @@ impl MutableVector for BooleanVectorBuilder {
Arc::new(self.finish_cloned())
}
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
match value.as_boolean()? {
Some(v) => self.mutable_array.append_value(v),
None => self.mutable_array.append_null(),
@@ -357,8 +357,8 @@ mod tests {
let input = BooleanVector::from_slice(&[true, false, true]);
let mut builder = BooleanType.create_mutable_vector(3);
builder.push_value_ref(ValueRef::Boolean(true));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.push_value_ref(&ValueRef::Boolean(true));
assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
builder.extend_slice_of(&input, 1, 2).unwrap();
assert!(
builder

View File

@@ -69,8 +69,8 @@ mod tests {
let input = DateVector::from_slice([1, 2, 3]);
let mut builder = DateType.create_mutable_vector(3);
builder.push_value_ref(ValueRef::Date(Date::new(5)));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.push_value_ref(&ValueRef::Date(Date::new(5)));
assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
builder.extend_slice_of(&input, 1, 2).unwrap();
assert!(
builder

View File

@@ -314,7 +314,7 @@ impl MutableVector for Decimal128VectorBuilder {
Arc::new(self.finish_cloned())
}
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
let decimal_val = value.as_decimal128()?.map(|v| v.val());
self.mutable_array.append_option(decimal_val);
Ok(())

View File

@@ -33,6 +33,7 @@ use crate::data_type::ConcreteDataType;
use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Result};
use crate::prelude::DataType;
use crate::scalars::{Scalar, ScalarVectorBuilder};
use crate::types::StructType;
use crate::value::{ListValue, ListValueRef, Value};
use crate::vectors::struct_vector::StructVector;
use crate::vectors::{
@@ -237,8 +238,14 @@ impl Helper {
let vector = Decimal128Vector::from(vec![v]).with_precision_and_scale(p, s)?;
ConstantVector::new(Arc::new(vector), length)
}
ScalarValue::Struct(v) => {
let struct_type = StructType::try_from(v.fields())?;
ConstantVector::new(
Arc::new(StructVector::try_new(struct_type, (*v).clone())?),
length,
)
}
ScalarValue::Decimal256(_, _, _)
| ScalarValue::Struct(_)
| ScalarValue::FixedSizeList(_)
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
@@ -386,13 +393,16 @@ impl Helper {
}
}
ArrowDataType::Struct(_fields) => {
ArrowDataType::Struct(fields) => {
let array = array
.as_ref()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
Arc::new(StructVector::new(array.clone())?)
Arc::new(StructVector::try_new(
StructType::try_from(fields)?,
array.clone(),
)?)
}
ArrowDataType::Float16
| ArrowDataType::LargeList(_)
@@ -418,7 +428,7 @@ impl Helper {
pub fn try_from_row_into_vector(row: &[Value], dt: &ConcreteDataType) -> Result<VectorRef> {
let mut builder = dt.create_mutable_vector(row.len());
for val in row {
builder.try_push_value_ref(val.as_value_ref())?;
builder.try_push_value_ref(&val.as_value_ref())?;
}
let vector = builder.to_vector();
Ok(vector)

View File

@@ -49,11 +49,15 @@ impl ListVector {
pub(crate) fn as_arrow(&self) -> &dyn Array {
&self.array
}
pub(crate) fn item_type(&self) -> ConcreteDataType {
self.item_type.clone()
}
}
impl Vector for ListVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::List(ListType::new(self.item_type.clone()))
ConcreteDataType::List(ListType::new(self.item_type()))
}
fn vector_type_name(&self) -> String {
@@ -246,7 +250,7 @@ impl ListVectorBuilder {
fn push_list_value(&mut self, list_value: &ListValue) -> Result<()> {
for v in list_value.items() {
self.values_builder.try_push_value_ref(v.as_value_ref())?;
self.values_builder.try_push_value_ref(&v.as_value_ref())?;
}
self.finish_list(true);
@@ -279,7 +283,7 @@ impl MutableVector for ListVectorBuilder {
Arc::new(self.finish_cloned())
}
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
if let Some(list_ref) = value.as_list()? {
match list_ref {
ListValueRef::Indexed { vector, idx } => match vector.get(idx).as_list()? {
@@ -287,6 +291,13 @@ impl MutableVector for ListVectorBuilder {
None => self.push_null(),
},
ListValueRef::Ref { val } => self.push_list_value(val)?,
ListValueRef::RefList { val, item_datatype } => {
let list_value = ListValue::new(
val.iter().map(|v| Value::from(v.clone())).collect(),
item_datatype.clone(),
);
self.push_list_value(&list_value)?;
}
}
} else {
self.push_null();
@@ -298,7 +309,7 @@ impl MutableVector for ListVectorBuilder {
fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
for idx in offset..offset + length {
let value = vector.get_ref(idx);
self.try_push_value_ref(value)?;
self.try_push_value_ref(&value)?;
}
Ok(())
@@ -320,7 +331,7 @@ impl ScalarVectorBuilder for ListVectorBuilder {
// We expect the input ListValue has the same inner type as the builder when using
// push(), so just panic if `push_value_ref()` returns error, which indicate an
// invalid input value type.
self.try_push_value_ref(value.into()).unwrap_or_else(|e| {
self.try_push_value_ref(&value.into()).unwrap_or_else(|e| {
panic!(
"Failed to push value, expect value type {:?}, err:{}",
self.item_type, e
@@ -663,13 +674,13 @@ pub mod tests {
fn test_list_vector_builder() {
let mut builder =
ListType::new(ConcreteDataType::int32_datatype()).create_mutable_vector(3);
builder.push_value_ref(ValueRef::List(ListValueRef::Ref {
builder.push_value_ref(&ValueRef::List(ListValueRef::Ref {
val: &ListValue::new(
vec![Value::Int32(4), Value::Null, Value::Int32(6)],
ConcreteDataType::int32_datatype(),
),
}));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
let data = vec![
Some(vec![Some(1), Some(2), Some(3)]),

View File

@@ -21,6 +21,7 @@ use snafu::{OptionExt, ensure};
use crate::data_type::ConcreteDataType;
use crate::error::{self, Result};
use crate::prelude::{ScalarVector, ScalarVectorBuilder};
use crate::serialize::Serializable;
use crate::types::NullType;
use crate::value::{Value, ValueRef};
@@ -112,6 +113,21 @@ impl Vector for NullVector {
}
}
impl ScalarVector for NullVector {
type OwnedItem = ();
type RefItem<'a> = ();
type Iter<'a> = NullIter<'a>;
type Builder = NullVectorBuilder;
fn get_data(&self, _idx: usize) -> Option<Self::RefItem<'_>> {
Some(())
}
fn iter_data(&self) -> Self::Iter<'_> {
NullIter::new(self)
}
}
impl fmt::Debug for NullVector {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NullVector({})", self.len())
@@ -126,6 +142,30 @@ impl Serializable for NullVector {
vectors::impl_try_from_arrow_array_for_vector!(NullArray, NullVector);
pub struct NullIter<'a> {
vector: &'a NullVector,
index: usize,
}
impl<'a> NullIter<'a> {
pub fn new(vector: &'a NullVector) -> Self {
NullIter { vector, index: 0 }
}
}
impl<'a> Iterator for NullIter<'a> {
type Item = Option<()>;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.vector.len() {
self.index += 1;
Some(Some(()))
} else {
None
}
}
}
#[derive(Default)]
pub struct NullVectorBuilder {
length: usize,
@@ -158,7 +198,7 @@ impl MutableVector for NullVectorBuilder {
Arc::new(NullVector::new(self.length))
}
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
ensure!(
value.is_null(),
error::CastTypeSnafu {
@@ -197,6 +237,29 @@ impl MutableVector for NullVectorBuilder {
}
}
impl ScalarVectorBuilder for NullVectorBuilder {
type VectorType = NullVector;
fn with_capacity(_capacity: usize) -> Self {
Self::default()
}
fn push(&mut self, _value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
self.length += 1;
}
fn finish(&mut self) -> Self::VectorType {
let result = NullVector::new(self.length);
self.length = 0;
result
}
fn finish_cloned(&self) -> Self::VectorType {
NullVector::new(self.length)
}
}
pub(crate) fn replicate_null(vector: &NullVector, offsets: &[usize]) -> VectorRef {
assert_eq!(offsets.len(), vector.len());
@@ -265,7 +328,7 @@ mod tests {
fn test_null_vector_builder() {
let mut builder = NullType.create_mutable_vector(3);
builder.push_null();
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
let input = NullVector::new(3);
builder.extend_slice_of(&input, 1, 2).unwrap();

View File

@@ -29,7 +29,7 @@ pub(crate) fn replicate_scalar<C: ScalarVector>(c: &C, offsets: &[usize]) -> Vec
for (i, offset) in offsets.iter().enumerate() {
let data = c.get_data(i);
for _ in previous_offset..*offset {
builder.push(data);
builder.push(data.clone());
}
previous_offset = *offset;
}

View File

@@ -309,7 +309,7 @@ impl<T: LogicalPrimitiveType> MutableVector for PrimitiveVectorBuilder<T> {
Arc::new(self.finish_cloned())
}
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
let primitive = T::cast_value_ref(value)?;
match primitive {
Some(v) => self.mutable_array.append_value(v.into_native()),
@@ -549,8 +549,8 @@ mod tests {
#[test]
fn test_primitive_vector_builder() {
let mut builder = Int64Type::default().create_mutable_vector(3);
builder.push_value_ref(ValueRef::Int64(123));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.push_value_ref(&ValueRef::Int64(123));
assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
let input = Int64Vector::from_slice([7, 8, 9]);
builder.extend_slice_of(&input, 1, 2).unwrap();
@@ -711,8 +711,8 @@ mod tests {
#[test]
fn test_primitive_vector_builder_finish_cloned() {
let mut builder = Int64Type::default().create_mutable_vector(3);
builder.push_value_ref(ValueRef::Int64(123));
builder.push_value_ref(ValueRef::Int64(456));
builder.push_value_ref(&ValueRef::Int64(123));
builder.push_value_ref(&ValueRef::Int64(456));
let vector = builder.to_vector_cloned();
assert_eq!(vector.len(), 2);
assert_eq!(vector.null_count(), 0);

View File

@@ -194,7 +194,7 @@ impl MutableVector for StringVectorBuilder {
Arc::new(self.finish_cloned())
}
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
match value.as_string()? {
Some(v) => self.mutable_array.append_value(v),
None => self.mutable_array.append_null(),
@@ -289,8 +289,8 @@ mod tests {
#[test]
fn test_string_vector_builder() {
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push_value_ref(ValueRef::String("hello"));
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
builder.push_value_ref(&ValueRef::String("hello"));
assert!(builder.try_push_value_ref(&ValueRef::Int32(123)).is_err());
let input = StringVector::from_slice(&["world", "one", "two"]);
builder.extend_slice_of(&input, 1, 2).unwrap();

View File

@@ -12,33 +12,44 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use arrow::array::NullBufferBuilder;
use arrow::compute::TakeOptions;
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::{Array, ArrayRef, StructArray};
use serde_json::Value as JsonValue;
use snafu::ResultExt;
use datafusion_common::ScalarValue;
use snafu::{ResultExt, ensure};
use crate::error::{self, ArrowComputeSnafu, Result, UnsupportedOperationSnafu};
use crate::prelude::ConcreteDataType;
use crate::error::{
ArrowComputeSnafu, ConversionSnafu, Error, InconsistentStructFieldsAndItemsSnafu, Result,
SerializeSnafu, UnsupportedOperationSnafu,
};
use crate::prelude::{ConcreteDataType, DataType, ScalarVector, ScalarVectorBuilder};
use crate::serialize::Serializable;
use crate::value::{Value, ValueRef};
use crate::types::StructType;
use crate::value::{StructValue, StructValueRef, Value, ValueRef};
use crate::vectors::operations::VectorOp;
use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
use crate::vectors::{self, Helper, MutableVector, Validity, Vector, VectorRef};
/// A simple wrapper around `StructArray` to represent a vector of structs in GreptimeDB.
#[derive(Debug, PartialEq)]
pub struct StructVector {
array: StructArray,
data_type: ConcreteDataType,
fields: StructType,
}
#[allow(unused)]
impl StructVector {
pub fn new(array: StructArray) -> Result<Self> {
let fields = array.fields();
let data_type = ConcreteDataType::Struct(fields.try_into()?);
Ok(StructVector { array, data_type })
pub fn try_new(fields: StructType, array: StructArray) -> Result<Self> {
ensure!(
fields.fields().len() == array.fields().len(),
InconsistentStructFieldsAndItemsSnafu {
field_len: fields.fields().len(),
item_len: array.fields().len(),
}
);
Ok(StructVector { array, fields })
}
pub fn array(&self) -> &StructArray {
@@ -48,11 +59,15 @@ impl StructVector {
pub fn as_arrow(&self) -> &dyn Array {
&self.array
}
pub fn struct_type(&self) -> &StructType {
&self.fields
}
}
impl Vector for StructVector {
fn data_type(&self) -> ConcreteDataType {
self.data_type.clone()
ConcreteDataType::struct_datatype(self.fields.clone())
}
fn vector_type_name(&self) -> String {
@@ -94,16 +109,36 @@ impl Vector for StructVector {
fn slice(&self, offset: usize, length: usize) -> VectorRef {
Arc::new(StructVector {
array: self.array.slice(offset, length),
data_type: self.data_type.clone(),
fields: self.fields.clone(),
})
}
fn get(&self, _: usize) -> Value {
unimplemented!("StructValue not supported yet")
fn get(&self, index: usize) -> Value {
if !self.array.is_valid(index) {
return Value::Null;
}
let values = (0..self.fields.fields().len())
.map(|i| {
let field_array = &self.array.column(i);
if field_array.is_null(i) {
Value::Null
} else {
let scalar_value = ScalarValue::try_from_array(field_array, index).unwrap();
Value::try_from(scalar_value).unwrap()
}
})
.collect();
Value::Struct(StructValue::try_new(values, self.fields.clone()).unwrap())
}
fn get_ref(&self, _: usize) -> ValueRef {
unimplemented!("StructValue not supported yet")
fn get_ref(&self, index: usize) -> ValueRef {
ValueRef::Struct(StructValueRef::Indexed {
vector: self,
idx: index,
})
}
}
@@ -124,9 +159,7 @@ impl VectorOp for StructVector {
column_arrays,
self.array.nulls().cloned(),
);
Arc::new(
StructVector::new(replicated_array).expect("Failed to create replicated StructVector"),
)
Arc::new(StructVector::try_new(self.fields.clone(), replicated_array).unwrap())
}
fn cast(&self, _to_type: &ConcreteDataType) -> Result<VectorRef> {
@@ -159,23 +192,295 @@ impl VectorOp for StructVector {
impl Serializable for StructVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
let mut result = serde_json::Map::new();
for (field, value) in self.array.fields().iter().zip(self.array.columns().iter()) {
let value_vector = Helper::try_into_vector(value)?;
let field_value = value_vector.serialize_to_json()?;
result.insert(field.name().clone(), JsonValue::Array(field_value));
}
let fields = JsonValue::Object(result);
let data_type = serde_json::to_value(&self.data_type).context(error::SerializeSnafu)?;
Ok(vec![JsonValue::Object(
[
("fields".to_string(), fields),
("data_type".to_string(), data_type),
]
let vectors = self
.array
.columns()
.iter()
.cloned()
.collect(),
)])
.map(|value_array| Helper::try_into_vector(value_array))
.collect::<Result<Vec<_>>>()?;
(0..self.array.len())
.map(|idx| {
let mut result = serde_json::Map::with_capacity(vectors.len());
for (field, vector) in self.fields.fields().iter().zip(vectors.iter()) {
let field_value = vector.get(idx);
result.insert(
field.name().to_string(),
field_value.try_into().context(SerializeSnafu)?,
);
}
Ok(result.into())
})
.collect::<Result<Vec<serde_json::Value>>>()
}
}
impl TryFrom<StructArray> for StructVector {
type Error = Error;
fn try_from(array: StructArray) -> Result<Self> {
let fields = match array.data_type() {
ArrowDataType::Struct(fields) => StructType::try_from(fields)?,
other => ConversionSnafu {
from: other.to_string(),
}
.fail()?,
};
Ok(Self { array, fields })
}
}
impl ScalarVector for StructVector {
type OwnedItem = StructValue;
type RefItem<'a> = StructValueRef<'a>;
type Iter<'a> = StructIter<'a>;
type Builder = StructVectorBuilder;
fn get_data(&self, idx: usize) -> Option<Self::RefItem<'_>> {
if self.array.is_valid(idx) {
Some(StructValueRef::Indexed { vector: self, idx })
} else {
None
}
}
fn iter_data(&self) -> Self::Iter<'_> {
StructIter::new(self)
}
}
pub struct StructIter<'a> {
vector: &'a StructVector,
index: usize,
}
impl<'a> StructIter<'a> {
pub fn new(vector: &'a StructVector) -> Self {
Self { vector, index: 0 }
}
}
impl<'a> Iterator for StructIter<'a> {
type Item = Option<StructValueRef<'a>>;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.vector.len() {
let idx = self.index;
self.index += 1;
if self.vector.is_null(idx) {
Some(None)
} else {
let value = StructValueRef::Indexed {
vector: self.vector,
idx,
};
Some(Some(value))
}
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.vector.len(), Some(self.vector.len()))
}
}
pub struct StructVectorBuilder {
value_builders: Vec<Box<dyn MutableVector>>,
null_buffer: NullBufferBuilder,
fields: StructType,
}
impl StructVectorBuilder {
pub fn with_type_and_capacity(fields: StructType, capacity: usize) -> Self {
let value_builders = fields
.fields()
.iter()
.map(|f| f.data_type().create_mutable_vector(capacity))
.collect();
Self {
value_builders,
null_buffer: NullBufferBuilder::new(capacity),
fields,
}
}
fn push_struct_value(&mut self, struct_value: &StructValue) -> Result<()> {
for (index, value) in struct_value.items().iter().enumerate() {
self.value_builders[index].try_push_value_ref(&value.as_value_ref())?;
}
self.null_buffer.append_non_null();
Ok(())
}
fn push_null_struct_value(&mut self) {
for builder in &mut self.value_builders {
builder.push_null();
}
self.null_buffer.append_null();
}
}
impl MutableVector for StructVectorBuilder {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::struct_datatype(self.fields.clone())
}
fn len(&self) -> usize {
self.null_buffer.len()
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
fn to_vector(&mut self) -> VectorRef {
Arc::new(self.finish())
}
fn to_vector_cloned(&self) -> VectorRef {
Arc::new(self.finish_cloned())
}
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
if let Some(struct_ref) = value.as_struct()? {
match struct_ref {
StructValueRef::Indexed { vector, idx } => match vector.get(idx).as_struct()? {
Some(struct_value) => self.push_struct_value(struct_value)?,
None => self.push_null(),
},
StructValueRef::Ref(val) => self.push_struct_value(val)?,
StructValueRef::RefList { val, fields } => {
let struct_value = StructValue::try_new(
val.iter().map(|v| Value::from(v.clone())).collect(),
fields.clone(),
)?;
self.push_struct_value(&struct_value)?;
}
}
} else {
self.push_null();
}
Ok(())
}
fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
for idx in offset..offset + length {
let value = vector.get_ref(idx);
self.try_push_value_ref(&value)?;
}
Ok(())
}
fn push_null(&mut self) {
self.push_null_struct_value();
}
}
impl ScalarVectorBuilder for StructVectorBuilder {
type VectorType = StructVector;
fn with_capacity(_capacity: usize) -> Self {
panic!("Must use StructVectorBuilder::with_type_capacity()");
}
fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>) {
self.try_push_value_ref(&value.map(ValueRef::Struct).unwrap_or(ValueRef::Null))
.unwrap_or_else(|e| {
panic!(
"Failed to push value, expect value type {:?}, err:{}",
self.fields, e
);
});
}
fn finish(&mut self) -> Self::VectorType {
let arrays = self
.value_builders
.iter_mut()
.map(|b| b.to_vector().to_arrow_array())
.collect();
let struct_array = StructArray::new(
self.fields.as_arrow_fields(),
arrays,
self.null_buffer.finish(),
);
StructVector::try_new(self.fields.clone(), struct_array).unwrap()
}
fn finish_cloned(&self) -> Self::VectorType {
let arrays = self
.value_builders
.iter()
.map(|b| b.to_vector_cloned().to_arrow_array())
.collect();
let struct_array = StructArray::new(
self.fields.as_arrow_fields(),
arrays,
self.null_buffer.finish_cloned(),
);
StructVector::try_new(self.fields.clone(), struct_array).unwrap()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::value::tests::*;
#[test]
fn test_struct_vector_builder() {
let struct_type = build_struct_type();
let struct_values = (0..10).map(|_| build_struct_value());
let mut builder = StructVectorBuilder::with_type_and_capacity(struct_type.clone(), 20);
for value in struct_values {
builder.push(Some(StructValueRef::Ref(&value)));
}
builder.push_nulls(5);
let vector = builder.finish();
assert_eq!(
vector.data_type(),
ConcreteDataType::struct_datatype(struct_type.clone())
);
assert_eq!(vector.len(), 15);
assert_eq!(vector.null_count(), 5);
let mut null_count = 0;
for item in vector.iter_data() {
if let Some(value) = item.as_ref() {
assert_eq!(value.struct_type(), &struct_type);
} else {
null_count += 1;
}
}
assert_eq!(5, null_count);
let value = vector.get(2);
if let Value::Struct(struct_value) = value {
assert_eq!(struct_value.struct_type(), &struct_type);
let mut items = struct_value.items().iter();
assert_eq!(items.next(), Some(&Value::Int32(1)));
assert_eq!(items.next(), Some(&Value::String("tom".into())));
assert_eq!(items.next(), Some(&Value::UInt8(25)));
assert_eq!(items.next(), Some(&Value::String("94038".into())));
assert_eq!(items.next(), None);
} else {
panic!("Expected a struct value");
}
}
}

View File

@@ -193,10 +193,12 @@ pub fn table_info_value_to_relation_desc(
pub fn from_proto_to_data_type(
column_schema: &api::v1::ColumnSchema,
) -> Result<ConcreteDataType, Error> {
let wrapper =
ColumnDataTypeWrapper::try_new(column_schema.datatype, column_schema.datatype_extension)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let wrapper = ColumnDataTypeWrapper::try_new(
column_schema.datatype,
column_schema.datatype_extension.clone(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cdt = ConcreteDataType::from(wrapper);
Ok(cdt)

View File

@@ -210,8 +210,8 @@ impl TimeWindowExpr {
let mut vector = cdt.create_mutable_vector(rows.rows.len());
for row in rows.rows {
let value = pb_value_to_value_ref(&row.values[ts_col_index], &None);
vector.try_push_value_ref(value).context(DataTypeSnafu {
let value = pb_value_to_value_ref(&row.values[ts_col_index], None);
vector.try_push_value_ref(&value).context(DataTypeSnafu {
msg: "Failed to convert rows to columns",
})?;
}

View File

@@ -611,7 +611,7 @@ fn reduce_batch_subgraph(
i
)
})?
.try_push_value_ref(v.as_value_ref())
.try_push_value_ref(&v.as_value_ref())
.context(DataTypeSnafu {
msg: "Failed to push value",
})?;

View File

@@ -122,7 +122,7 @@ impl Batch {
);
for (idx, value) in row.iter().enumerate() {
builder[idx]
.try_push_value_ref(value.as_value_ref())
.try_push_value_ref(&value.as_value_ref())
.context(DataTypeSnafu {
msg: "Failed to convert rows to columns",
})?;

View File

@@ -136,7 +136,7 @@ impl DfScalarFunction {
{
let typ = typ.scalar_type();
let mut array = typ.create_mutable_vector(1);
array.push_value_ref(values[idx].as_value_ref());
array.push_value_ref(&values[idx].as_value_ref());
cols.push(array.to_vector().to_arrow_array());
}
let schema = self.df_schema.inner().clone();

View File

@@ -194,7 +194,7 @@ impl From<ProtoRow> for Row {
Row::pack(
row.values
.iter()
.map(|pb_val| -> Value { pb_value_to_value_ref(pb_val, &None).into() }),
.map(|pb_val| -> Value { pb_value_to_value_ref(pb_val, None).into() }),
)
}
}

View File

@@ -87,7 +87,7 @@ impl Limiter {
for insert in &request.inserts {
for column in &insert.columns {
if let Some(values) = &column.values {
size += self.size_of_column_values(values);
size += Self::size_of_column_values(values);
}
}
}
@@ -104,7 +104,7 @@ impl Limiter {
for row in &rows.rows {
for value in &row.values {
if let Some(value) = &value.value_data {
size += self.size_of_value_data(value);
size += Self::size_of_value_data(value);
}
}
}
@@ -113,7 +113,7 @@ impl Limiter {
size
}
fn size_of_column_values(&self, values: &Values) -> usize {
fn size_of_column_values(values: &Values) -> usize {
let mut size: usize = 0;
size += values.i8_values.len() * size_of::<i32>();
size += values.i16_values.len() * size_of::<i32>();
@@ -146,10 +146,41 @@ impl Limiter {
size += values.interval_day_time_values.len() * size_of::<i64>();
size += values.interval_month_day_nano_values.len() * size_of::<IntervalMonthDayNano>();
size += values.decimal128_values.len() * size_of::<Decimal128>();
size += values
.list_values
.iter()
.map(|v| {
v.items
.iter()
.map(|item| {
item.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0)
})
.sum::<usize>()
})
.sum::<usize>();
size += values
.struct_values
.iter()
.map(|v| {
v.items
.iter()
.map(|item| {
item.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0)
})
.sum::<usize>()
})
.sum::<usize>();
size
}
fn size_of_value_data(&self, value: &ValueData) -> usize {
fn size_of_value_data(value: &ValueData) -> usize {
match value {
ValueData::I8Value(_) => size_of::<i32>(),
ValueData::I16Value(_) => size_of::<i32>(),
@@ -178,6 +209,26 @@ impl Limiter {
ValueData::IntervalDayTimeValue(_) => size_of::<i64>(),
ValueData::IntervalMonthDayNanoValue(_) => size_of::<IntervalMonthDayNano>(),
ValueData::Decimal128Value(_) => size_of::<Decimal128>(),
ValueData::ListValue(list_values) => list_values
.items
.iter()
.map(|item| {
item.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0)
})
.sum(),
ValueData::StructValue(struct_values) => struct_values
.items
.iter()
.map(|item| {
item.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0)
})
.sum(),
}
}
}

View File

@@ -82,11 +82,11 @@ impl RowModifier {
let internal_columns = [
(
ReservedColumnId::table_id(),
api::helper::pb_value_to_value_ref(&table_id, &None),
api::helper::pb_value_to_value_ref(&table_id, None),
),
(
ReservedColumnId::tsid(),
api::helper::pb_value_to_value_ref(&tsid, &None),
api::helper::pb_value_to_value_ref(&tsid, None),
),
];
self.codec
@@ -323,7 +323,7 @@ impl RowIter<'_> {
idx.column_id,
api::helper::pb_value_to_value_ref(
&self.row.values[idx.index],
&self.schema[idx.index].datatype_extension,
self.schema[idx.index].datatype_extension.as_ref(),
),
)
})

View File

@@ -46,7 +46,7 @@ fn encode_sparse(c: &mut Criterion) {
b.iter(|| {
let mut buffer = Vec::new();
codec
.encode_to_vec(internal_columns.into_iter(), &mut buffer)
.encode_to_vec(internal_columns.clone().into_iter(), &mut buffer)
.unwrap();
codec
.encode_to_vec(

View File

@@ -205,7 +205,7 @@ impl KeyValue<'_> {
.map(|idx| match idx {
Some(i) => api::helper::pb_value_to_value_ref(
&self.row.values[*i],
&self.schema[*i].datatype_extension,
self.schema[*i].datatype_extension.as_ref(),
),
None => ValueRef::Null,
})
@@ -218,7 +218,7 @@ impl KeyValue<'_> {
.map(|idx| match idx {
Some(i) => api::helper::pb_value_to_value_ref(
&self.row.values[*i],
&self.schema[*i].datatype_extension,
self.schema[*i].datatype_extension.as_ref(),
),
None => ValueRef::Null,
})
@@ -230,7 +230,7 @@ impl KeyValue<'_> {
let index = self.helper.indices[self.helper.num_primary_key_column].unwrap();
api::helper::pb_value_to_value_ref(
&self.row.values[index],
&self.schema[index].datatype_extension,
self.schema[index].datatype_extension.as_ref(),
)
}

View File

@@ -516,7 +516,7 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec {
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()> {
let iter = values.iter().map(|(_, v)| *v);
let iter = values.iter().map(|(_, v)| v.clone());
self.encode_dense(iter, buffer)
}

View File

@@ -310,7 +310,7 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()> {
self.encode_to_vec(values.iter().map(|v| (v.0, v.1)), buffer)
self.encode_to_vec(values.iter().map(|v| (v.0, v.1.clone())), buffer)
}
fn estimated_size(&self) -> Option<usize> {
@@ -678,7 +678,12 @@ mod tests {
.has_column(&buffer, &mut offsets_map, column_id)
.unwrap();
let value = codec.decode_value_at(&buffer, offset, column_id).unwrap();
let expected_value = row.iter().find(|(id, _)| *id == column_id).unwrap().1;
let expected_value = row
.iter()
.find(|(id, _)| *id == column_id)
.unwrap()
.1
.clone();
assert_eq!(value.as_value_ref(), expected_value);
}
}

View File

@@ -54,7 +54,7 @@ impl FieldBuilder {
}
Ok(())
}
FieldBuilder::Other(b) => b.try_push_value_ref(value),
FieldBuilder::Other(b) => b.try_push_value_ref(&value),
}
}

View File

@@ -262,7 +262,7 @@ impl PrimaryKeyColumnBuilder {
}
}
PrimaryKeyColumnBuilder::Vector(builder) => {
builder.push_value_ref(value);
builder.push_value_ref(&value);
}
}
Ok(())
@@ -785,11 +785,11 @@ fn mutations_to_record_batch(
.encode_to_vec(row.primary_keys(), &mut pk_buffer)
.context(EncodeSnafu)?;
pk_builder.append_value(pk_buffer.as_bytes());
ts_vector.push_value_ref(row.timestamp());
ts_vector.push_value_ref(&row.timestamp());
sequence_builder.append_value(row.sequence());
op_type_builder.append_value(row.op_type() as u8);
for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
builder.push_value_ref(field);
builder.push_value_ref(&field);
}
}
}

View File

@@ -219,7 +219,7 @@ impl DataBuffer {
/// Writes a row to data buffer.
pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
self.ts_builder.push_value_ref(kv.timestamp());
self.ts_builder.push_value_ref(&kv.timestamp());
self.pk_index_builder.push(Some(pk_index));
self.sequence_builder.push(Some(kv.sequence()));
self.op_type_builder.push(Some(kv.op_type() as u8));
@@ -229,7 +229,7 @@ impl DataBuffer {
for (idx, field) in kv.fields().enumerate() {
self.field_builders[idx]
.get_or_create_builder(self.ts_builder.len())
.push_value_ref(field);
.push_value_ref(&field);
}
}

View File

@@ -355,7 +355,7 @@ impl LastFieldsBuilder {
// Builds last fields.
for (builder, value) in self.builders.iter_mut().zip(&self.last_fields) {
// Safety: Vectors of the batch has the same type.
builder.push_value_ref(value.as_value_ref());
builder.push_value_ref(&value.as_value_ref());
}
let fields = self
.builders

View File

@@ -394,7 +394,7 @@ fn new_repeated_vector(
) -> common_recordbatch::error::Result<VectorRef> {
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector
.try_push_value_ref(value.as_value_ref())
.try_push_value_ref(&value.as_value_ref())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
// This requires an additional allocation.

View File

@@ -182,7 +182,7 @@ impl WriteRequest {
ensure!(
is_column_type_value_eq(
input_col.datatype,
input_col.datatype_extension,
input_col.datatype_extension.clone(),
&column.column_schema.data_type
),
InvalidRequestSnafu {
@@ -411,13 +411,7 @@ impl WriteRequest {
};
// Convert default value into proto's value.
to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
region_id: self.region_id,
reason: format!(
"no protobuf type for default value of column {} ({:?})",
column.column_schema.name, column.column_schema.data_type
),
})
Ok(to_proto_value(default_value))
}
}

View File

@@ -540,15 +540,15 @@ mod tests {
for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows {
match text_english_case_sensitive {
Some(s) => vec_english_sensitive.push_value_ref((*s).into()),
Some(s) => vec_english_sensitive.push_value_ref(&(*s).into()),
None => vec_english_sensitive.push_null(),
}
match text_english_case_insensitive {
Some(s) => vec_english_insensitive.push_value_ref((*s).into()),
Some(s) => vec_english_insensitive.push_value_ref(&(*s).into()),
None => vec_english_insensitive.push_null(),
}
match text_chinese {
Some(s) => vec_chinese.push_value_ref((*s).into()),
Some(s) => vec_chinese.push_value_ref(&(*s).into()),
None => vec_chinese.push_null(),
}
}

View File

@@ -669,14 +669,14 @@ impl FlatConvertFormat {
match decoded {
CompositeValues::Dense(dense) => {
if pk_index < dense.len() {
builder.push_value_ref(dense[pk_index].1.as_value_ref());
builder.push_value_ref(&dense[pk_index].1.as_value_ref());
} else {
builder.push_null();
}
}
CompositeValues::Sparse(sparse) => {
let value = sparse.get_or_null(column_id);
builder.push_value_ref(value.as_value_ref());
builder.push_value_ref(&value.as_value_ref());
}
};
}

View File

@@ -747,7 +747,7 @@ impl PrimaryKeyReadFormat {
for value_opt in values {
match value_opt {
// Safety: We use the same data type to create the converter.
Some(v) => builder.push_value_ref(v.as_value_ref()),
Some(v) => builder.push_value_ref(&v.as_value_ref()),
None => builder.push_null(),
}
}

View File

@@ -112,7 +112,7 @@ fn validate_rows(rows: &Option<Rows>) -> Result<()> {
for (col_idx, schema) in rows.schema.iter().enumerate() {
let column_type =
ColumnDataTypeWrapper::try_new(schema.datatype, schema.datatype_extension)
ColumnDataTypeWrapper::try_new(schema.datatype, schema.datatype_extension.clone())
.context(ColumnDataTypeSnafu)?
.into();
@@ -172,7 +172,7 @@ pub fn columns_to_rows(columns: Vec<Column>, row_count: u32) -> Result<Rows> {
column_name: column.column_name.clone(),
datatype: column.datatype,
semantic_type: column.semantic_type,
datatype_extension: column.datatype_extension,
datatype_extension: column.datatype_extension.clone(),
options: column.options.clone(),
};
schema.push(column_schema);
@@ -292,6 +292,8 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> {
),
(Decimal128, Decimal128Value, decimal128_values),
(Vector, BinaryValue, binary_values),
(List, ListValue, list_values),
(Struct, StructValue, struct_values),
);
Ok(())

View File

@@ -38,7 +38,7 @@ pub fn find_all_impure_columns(table_info: &TableInfo) -> Vec<ColumnSchema> {
/// Fill impure default values in the request
pub struct ImpureDefaultFiller {
impure_columns: HashMap<String, (api::v1::ColumnSchema, Option<api::v1::Value>)>,
impure_columns: HashMap<String, (api::v1::ColumnSchema, api::v1::Value)>,
}
impl ImpureDefaultFiller {
@@ -100,7 +100,7 @@ impl ImpureDefaultFiller {
.iter()
.filter_map(|(name, (schema, val))| {
if !impure_columns_in_reqs.contains(name) {
Some((schema.clone(), val.clone().unwrap_or_default()))
Some((schema.clone(), val.clone()))
} else {
None
}

View File

@@ -280,7 +280,7 @@ fn values_to_vectors_by_valid_types(
fn value_to_vector(value: Value) -> VectorRef {
let data_type = value.data_type();
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector.push_value_ref(value.as_value_ref());
mutable_vector.push_value_ref(&value.as_value_ref());
mutable_vector.to_vector()
}

View File

@@ -1784,7 +1784,7 @@ fn find_partition_entries(
.unwrap();
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
.context(ColumnDataTypeSnafu)?,
);
Ok((column_name, data_type))

View File

@@ -119,7 +119,7 @@ impl<'a> SplitReadRowHelper<'a> {
idx.as_ref().map_or(Value::Null, |idx| {
helper::pb_value_to_value_ref(
&row.values[*idx],
&self.schema[*idx].datatype_extension,
self.schema[*idx].datatype_extension.as_ref(),
)
.into()
})

View File

@@ -567,7 +567,7 @@ impl HistogramFoldStream {
// "sample" normal columns
for normal_index in &self.normal_indices {
let val = batch.column(*normal_index).get(cursor);
self.output_buffer[*normal_index].push_value_ref(val.as_value_ref());
self.output_buffer[*normal_index].push_value_ref(&val.as_value_ref());
}
// "fold" `le` and field columns
let le_array = batch.column(self.le_column_index);
@@ -594,7 +594,7 @@ impl HistogramFoldStream {
}
// ignore invalid data
let result = Self::evaluate_row(self.quantile, &bucket, &counters).unwrap_or(f64::NAN);
self.output_buffer[self.field_column_index].push_value_ref(ValueRef::from(result));
self.output_buffer[self.field_column_index].push_value_ref(&ValueRef::from(result));
cursor += bucket_num;
remaining_rows -= bucket_num;
self.output_buffered_rows += 1;

View File

@@ -496,7 +496,7 @@ async fn dryrun_pipeline_inner(
let mut map = Map::new();
let value_ref = pb_value_to_value_ref(
&v,
&result_schema[idx].datatype_extension,
result_schema[idx].datatype_extension.as_ref(),
);
let greptime_value: datatypes::value::Value = value_ref.into();
let serde_json_value =

View File

@@ -237,6 +237,17 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
"[{}]",
v.items().iter().map(|x| x.to_string()).join(", ")
))?,
Value::Struct(struct_value) => row_writer.write_col(format!(
"{{{}}}",
struct_value
.struct_type()
.fields()
.iter()
.map(|f| f.name())
.zip(struct_value.items().iter())
.map(|(k, v)| format!("{k}: {v}"))
.join(", ")
))?,
Value::Time(v) => row_writer
.write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?,
Value::Decimal128(v) => row_writer.write_col(v.to_string())?,
@@ -293,6 +304,7 @@ pub(crate) fn create_mysql_column(
ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
_ => error::UnsupportedDataTypeSnafu {
data_type,
reason: "not implemented",

View File

@@ -28,7 +28,7 @@ use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::prelude::{ConcreteDataType, Value};
use datatypes::schema::Schema;
use datatypes::types::{IntervalType, TimestampType, json_type_value_to_string};
use datatypes::value::ListValue;
use datatypes::value::{ListValue, StructValue};
use pgwire::api::Type;
use pgwire::api::portal::{Format, Portal};
use pgwire::api::results::{DataRowEncoder, FieldInfo};
@@ -62,6 +62,14 @@ pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Ve
.collect::<Result<Vec<FieldInfo>>>()
}
fn encode_struct(
_query_ctx: &QueryContextRef,
_struct_value: &StructValue,
_builder: &mut DataRowEncoder,
) -> PgWireResult<()> {
todo!("how to encode struct for postgres");
}
fn encode_array(
query_ctx: &QueryContextRef,
value_list: &ListValue,
@@ -428,6 +436,7 @@ pub(super) fn encode_value(
})),
},
Value::List(values) => encode_array(query_ctx, values, builder),
Value::Struct(values) => encode_struct(query_ctx, values, builder),
}
}
@@ -467,10 +476,12 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
&ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
&ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
&ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
// TODO(sunng87) we may treat list/array as json directly so we can
// support deeply nested data structures
&ConcreteDataType::Struct(_) => Ok(Type::RECORD_ARRAY),
&ConcreteDataType::Dictionary(_)
| &ConcreteDataType::Vector(_)
| &ConcreteDataType::List(_)
| &ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu {
| &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
data_type: origin,
reason: "not implemented",
}
@@ -482,11 +493,7 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
}
.fail(),
&ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
&ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu {
data_type: origin,
reason: "not implemented",
}
.fail(),
&ConcreteDataType::Struct(_) => Ok(Type::RECORD),
}
}