feat: struct vector (#6595)

* feat: struct vector

Signed-off-by: discord9 <discord9@163.com>

* fix: array2vector&arrow type2concrete type

Signed-off-by: discord9 <discord9@163.com>

* chore: clippy

Signed-off-by: discord9 <discord9@163.com>

* chore: resolve some todos

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-07-29 16:22:27 +08:00
committed by GitHub
parent 5377db5392
commit f07b1daed4
14 changed files with 389 additions and 10 deletions

View File

@@ -291,6 +291,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
ConcreteDataType::Vector(_) => ColumnDataType::Vector,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Duration(_) => {
return error::IntoColumnDataTypeSnafu { from: datatype }.fail()
@@ -703,6 +704,7 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
ConcreteDataType::Vector(_) => Arc::new(BinaryVector::from_vec(values.binary_values)),
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Duration(_)
| ConcreteDataType::Json(_) => {
@@ -864,6 +866,7 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<
ConcreteDataType::Vector(_) => values.binary_values.into_iter().map(|v| v.into()).collect(),
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Duration(_)
| ConcreteDataType::Json(_) => {

View File

@@ -70,7 +70,7 @@ macro_rules! convert_arrow_array_to_grpc_vals {
return Ok(vals);
},
)+
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => unreachable!("Should not send {:?} in gRPC", $data_type),
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Struct(_) | ConcreteDataType::Dictionary(_) | ConcreteDataType::Duration(_) | ConcreteDataType::Json(_) => unreachable!("Should not send {:?} in gRPC", $data_type),
}
}};
}

View File

@@ -34,9 +34,9 @@ use crate::types::{
DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type,
Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalDayTimeType,
IntervalMonthDayNanoType, IntervalType, IntervalYearMonthType, JsonType, ListType, NullType,
StringType, TimeMillisecondType, TimeType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType, TimestampType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type, VectorType,
StringType, StructType, TimeMillisecondType, TimeType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, TimestampType,
UInt16Type, UInt32Type, UInt64Type, UInt8Type, VectorType,
};
use crate::value::Value;
use crate::vectors::MutableVector;
@@ -80,6 +80,7 @@ pub enum ConcreteDataType {
// Compound types:
List(ListType),
Dictionary(DictionaryType),
Struct(StructType),
// JSON type:
Json(JsonType),
@@ -131,6 +132,7 @@ impl fmt::Display for ConcreteDataType {
},
ConcreteDataType::Decimal128(v) => write!(f, "{}", v.name()),
ConcreteDataType::List(v) => write!(f, "{}", v.name()),
ConcreteDataType::Struct(v) => write!(f, "{}", v.name()),
ConcreteDataType::Dictionary(v) => write!(f, "{}", v.name()),
ConcreteDataType::Json(v) => write!(f, "{}", v.name()),
ConcreteDataType::Vector(v) => write!(f, "{}", v.name()),
@@ -406,9 +408,12 @@ impl ConcreteDataType {
&ConcreteDataType::Duration(_)
| &ConcreteDataType::Dictionary(_)
| &ConcreteDataType::Vector(_)
| &ConcreteDataType::List(_) => "UNKNOWN",
| &ConcreteDataType::List(_)
| &ConcreteDataType::Struct(_) => "UNKNOWN",
},
&ConcreteDataType::Duration(_) | &ConcreteDataType::Dictionary(_) => "UNKNOWN",
&ConcreteDataType::Duration(_)
| &ConcreteDataType::Dictionary(_)
| &ConcreteDataType::Struct(_) => "UNKNOWN",
}
}
}
@@ -457,7 +462,20 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
ArrowDataType::Decimal128(precision, scale) => {
ConcreteDataType::decimal128_datatype(*precision, *scale)
}
_ => {
ArrowDataType::Struct(fields) => ConcreteDataType::Struct(fields.try_into()?),
ArrowDataType::Float16
| ArrowDataType::Date64
| ArrowDataType::FixedSizeBinary(_)
| ArrowDataType::BinaryView
| ArrowDataType::Utf8View
| ArrowDataType::ListView(_)
| ArrowDataType::FixedSizeList(_, _)
| ArrowDataType::LargeList(_)
| ArrowDataType::LargeListView(_)
| ArrowDataType::Union(_, _)
| ArrowDataType::Decimal256(_, _)
| ArrowDataType::Map(_, _)
| ArrowDataType::RunEndEncoded(_, _) => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: dt.clone(),
}
@@ -613,6 +631,10 @@ impl ConcreteDataType {
ConcreteDataType::List(ListType::new(item_type))
}
pub fn struct_datatype(fields: StructType) -> ConcreteDataType {
ConcreteDataType::Struct(fields)
}
pub fn dictionary_datatype(
key_type: ConcreteDataType,
value_type: ConcreteDataType,

View File

@@ -65,6 +65,7 @@ pub enum LogicalTypeId {
List,
Dictionary,
Struct,
Json,
@@ -108,6 +109,7 @@ impl LogicalTypeId {
LogicalTypeId::List => {
ConcreteDataType::list_datatype(ConcreteDataType::null_datatype())
}
LogicalTypeId::Struct => ConcreteDataType::struct_datatype(vec![].into()),
LogicalTypeId::Dictionary => ConcreteDataType::dictionary_datatype(
ConcreteDataType::null_datatype(),
ConcreteDataType::null_datatype(),

View File

@@ -25,6 +25,7 @@ mod list_type;
mod null_type;
mod primitive_type;
mod string_type;
mod struct_type;
mod time_type;
mod timestamp_type;
mod vector_type;
@@ -52,6 +53,7 @@ pub use primitive_type::{
OrdPrimitive, UInt16Type, UInt32Type, UInt64Type, UInt8Type, WrapperType,
};
pub use string_type::StringType;
pub use struct_type::{StructField, StructType};
pub use time_type::{
TimeMicrosecondType, TimeMillisecondType, TimeNanosecondType, TimeSecondType, TimeType,
};

View File

@@ -0,0 +1,134 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow_schema::Fields;
use serde::{Deserialize, Serialize};
use crate::prelude::{ConcreteDataType, DataType, LogicalTypeId};
use crate::value::Value;
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct StructType {
fields: Vec<StructField>,
}
impl TryFrom<&Fields> for StructType {
type Error = crate::error::Error;
fn try_from(value: &Fields) -> Result<Self, Self::Error> {
let fields = value
.iter()
.map(|field| {
Ok(StructField::new(
field.name().to_string(),
ConcreteDataType::try_from(field.data_type())?,
field.is_nullable(),
))
})
.collect::<Result<Vec<StructField>, Self::Error>>()?;
Ok(StructType { fields })
}
}
impl From<Vec<StructField>> for StructType {
fn from(fields: Vec<StructField>) -> Self {
StructType { fields }
}
}
impl DataType for StructType {
fn name(&self) -> String {
format!(
"Struct<{}>",
self.fields
.iter()
.map(|f| f.name())
.collect::<Vec<_>>()
.join(", ")
)
}
fn logical_type_id(&self) -> LogicalTypeId {
LogicalTypeId::Struct
}
fn default_value(&self) -> Value {
Value::Null
}
fn as_arrow_type(&self) -> ArrowDataType {
let fields = self
.fields
.iter()
.map(|f| Field::new(f.name.clone(), f.data_type.as_arrow_type(), f.nullable))
.collect();
ArrowDataType::Struct(fields)
}
fn create_mutable_vector(&self, _capacity: usize) -> Box<dyn crate::prelude::MutableVector> {
unimplemented!("What is the mutable vector for StructVector?");
}
fn try_cast(&self, _from: Value) -> Option<Value> {
// TODO(discord9): what is the meaning of casting from Value to StructFields?
None
}
}
impl StructType {
pub fn new(fields: Vec<StructField>) -> Self {
StructType { fields }
}
pub fn fields(&self) -> &[StructField] {
&self.fields
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct StructField {
name: String,
data_type: ConcreteDataType,
nullable: bool,
}
impl StructField {
pub fn new(name: String, data_type: ConcreteDataType, nullable: bool) -> Self {
StructField {
name,
data_type,
nullable,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn data_type(&self) -> &ConcreteDataType {
&self.data_type
}
pub fn is_nullable(&self) -> bool {
self.nullable
}
pub fn to_df_field(&self) -> Field {
Field::new(
self.name.clone(),
self.data_type.as_arrow_type(),
self.nullable,
)
}
}

View File

@@ -28,6 +28,7 @@ use common_time::interval::IntervalUnit;
use common_time::time::Time;
use common_time::timestamp::{TimeUnit, Timestamp};
use common_time::{Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timezone};
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::ScalarValue;
use greptime_proto::v1::value::ValueData;
pub use ordered_float::OrderedFloat;
@@ -540,6 +541,14 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
Arc::new(new_item_field(output_type.as_arrow_type())),
0,
))),
ConcreteDataType::Struct(fields) => {
let fields = fields
.fields()
.iter()
.map(|f| f.to_df_field())
.collect::<Vec<_>>();
ScalarStructBuilder::new_null(fields)
}
ConcreteDataType::Dictionary(dict) => ScalarValue::Dictionary(
Box::new(dict.key_type().as_arrow_type()),
Box::new(to_null_scalar_value(dict.value_type())?),

View File

@@ -40,6 +40,7 @@ mod null;
pub(crate) mod operations;
mod primitive;
mod string;
mod struct_vector;
mod time;
mod timestamp;
mod validity;

View File

@@ -19,6 +19,7 @@ use common_time::interval::IntervalUnit;
use crate::data_type::DataType;
use crate::types::{DurationType, TimeType, TimestampType};
use crate::vectors::constant::ConstantVector;
use crate::vectors::struct_vector::StructVector;
use crate::vectors::{
BinaryVector, BooleanVector, DateVector, Decimal128Vector, DurationMicrosecondVector,
DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector,
@@ -109,6 +110,7 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool {
}
},
List(_) => is_vector_eq!(ListVector, lhs, rhs),
Struct(_) => is_vector_eq!(StructVector, lhs, rhs),
UInt8(_) | UInt16(_) | UInt32(_) | UInt64(_) | Int8(_) | Int16(_) | Int32(_) | Int64(_)
| Float32(_) | Float64(_) | Dictionary(_) => {
with_match_primitive_type_id!(lhs_type.logical_type_id(), |$T| {

View File

@@ -21,7 +21,7 @@ use arrow::array::{Array, ArrayRef, StringArray};
use arrow::compute;
use arrow::compute::kernels::comparison;
use arrow::datatypes::{DataType as ArrowDataType, Int64Type, TimeUnit};
use arrow_array::DictionaryArray;
use arrow_array::{DictionaryArray, StructArray};
use arrow_schema::IntervalUnit;
use datafusion_common::ScalarValue;
use snafu::{OptionExt, ResultExt};
@@ -31,6 +31,7 @@ use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Result};
use crate::prelude::DataType;
use crate::scalars::{Scalar, ScalarVectorBuilder};
use crate::value::{ListValue, ListValueRef, Value};
use crate::vectors::struct_vector::StructVector;
use crate::vectors::{
BinaryVector, BooleanVector, ConstantVector, DateVector, Decimal128Vector, DictionaryVector,
DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector,
@@ -359,10 +360,18 @@ impl Helper {
ConcreteDataType::try_from(value.as_ref())?,
)?)
}
ArrowDataType::Struct(_fields) => {
let array = array
.as_ref()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
Arc::new(StructVector::new(array.clone())?)
}
ArrowDataType::Float16
| ArrowDataType::LargeList(_)
| ArrowDataType::FixedSizeList(_, _)
| ArrowDataType::Struct(_)
| ArrowDataType::Union(_, _)
| ArrowDataType::Dictionary(_, _)
| ArrowDataType::Decimal256(_, _)

View File

@@ -0,0 +1,181 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow::compute::TakeOptions;
use arrow_array::{Array, ArrayRef, StructArray};
use serde_json::Value as JsonValue;
use snafu::ResultExt;
use crate::error::{self, ArrowComputeSnafu, Result, UnsupportedOperationSnafu};
use crate::prelude::ConcreteDataType;
use crate::serialize::Serializable;
use crate::value::{Value, ValueRef};
use crate::vectors::operations::VectorOp;
use crate::vectors::{self, Helper, Validity, Vector, VectorRef};
/// A simple wrapper around `StructArray` to represent a vector of structs in GreptimeDB.
#[derive(Debug, PartialEq)]
pub struct StructVector {
array: StructArray,
data_type: ConcreteDataType,
}
#[allow(unused)]
impl StructVector {
pub fn new(array: StructArray) -> Result<Self> {
let fields = array.fields();
let data_type = ConcreteDataType::Struct(fields.try_into()?);
Ok(StructVector { array, data_type })
}
pub fn array(&self) -> &StructArray {
&self.array
}
pub fn as_arrow(&self) -> &dyn Array {
&self.array
}
}
impl Vector for StructVector {
fn data_type(&self) -> ConcreteDataType {
self.data_type.clone()
}
fn vector_type_name(&self) -> String {
"StructVector".to_string()
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn len(&self) -> usize {
self.array.len()
}
fn to_arrow_array(&self) -> ArrayRef {
Arc::new(self.array.clone())
}
fn to_boxed_arrow_array(&self) -> Box<dyn Array> {
Box::new(self.array.clone())
}
fn validity(&self) -> Validity {
vectors::impl_validity_for_vector!(self.array)
}
fn memory_size(&self) -> usize {
self.array.get_buffer_memory_size()
}
fn null_count(&self) -> usize {
self.array.null_count()
}
fn is_null(&self, row: usize) -> bool {
self.array.is_null(row)
}
fn slice(&self, offset: usize, length: usize) -> VectorRef {
Arc::new(StructVector {
array: self.array.slice(offset, length),
data_type: self.data_type.clone(),
})
}
fn get(&self, _: usize) -> Value {
unimplemented!("StructValue not supported yet")
}
fn get_ref(&self, _: usize) -> ValueRef {
unimplemented!("StructValue not supported yet")
}
}
impl VectorOp for StructVector {
fn replicate(&self, offsets: &[usize]) -> VectorRef {
let column_arrays = self
.array
.columns()
.iter()
.map(|col| {
let vector = Helper::try_into_vector(col)
.expect("Failed to replicate struct vector columns");
vector.replicate(offsets).to_arrow_array()
})
.collect::<Vec<_>>();
let replicated_array = StructArray::new(
self.array.fields().clone(),
column_arrays,
self.array.nulls().cloned(),
);
Arc::new(
StructVector::new(replicated_array).expect("Failed to create replicated StructVector"),
)
}
fn cast(&self, _to_type: &ConcreteDataType) -> Result<VectorRef> {
UnsupportedOperationSnafu {
op: "cast",
vector_type: self.vector_type_name(),
}
.fail()
}
fn filter(&self, filter: &vectors::BooleanVector) -> Result<VectorRef> {
let filtered =
datafusion_common::arrow::compute::filter(&self.array, filter.as_boolean_array())
.context(ArrowComputeSnafu)
.and_then(Helper::try_into_vector)?;
Ok(filtered)
}
fn take(&self, indices: &vectors::UInt32Vector) -> Result<VectorRef> {
let take_result = datafusion_common::arrow::compute::take(
&self.array,
indices.as_arrow(),
Some(TakeOptions { check_bounds: true }),
)
.context(ArrowComputeSnafu)
.and_then(Helper::try_into_vector)?;
Ok(take_result)
}
}
impl Serializable for StructVector {
fn serialize_to_json(&self) -> Result<Vec<serde_json::Value>> {
let mut result = serde_json::Map::new();
for (field, value) in self.array.fields().iter().zip(self.array.columns().iter()) {
let value_vector = Helper::try_into_vector(value)?;
let field_value = value_vector.serialize_to_json()?;
result.insert(field.name().clone(), JsonValue::Array(field_value));
}
let fields = JsonValue::Object(result);
let data_type = serde_json::to_value(&self.data_type).context(error::SerializeSnafu)?;
Ok(vec![JsonValue::Object(
[
("fields".to_string(), fields),
("data_type".to_string(), data_type),
]
.iter()
.cloned()
.collect(),
)])
}
}

View File

@@ -78,6 +78,7 @@ impl SortField {
ConcreteDataType::Decimal128(_) => 19,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_) => 0,
}
}
@@ -134,6 +135,7 @@ impl SortField {
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::List(_) |
ConcreteDataType::Struct(_) |
ConcreteDataType::Dictionary(_) |
ConcreteDataType::Null(_) => {
return error::NotSupportedFieldSnafu {
@@ -218,6 +220,10 @@ impl SortField {
data_type: ConcreteDataType::List(l.clone()),
}
.fail(),
ConcreteDataType::Struct(f) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::Struct(f.clone()),
}
.fail(),
ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::Dictionary(d.clone()),
}
@@ -301,6 +307,7 @@ impl SortField {
ConcreteDataType::Decimal128(_) => 19,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_) => 0,
};
deserializer.advance(to_skip);

View File

@@ -469,7 +469,8 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
&ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
&ConcreteDataType::Dictionary(_)
| &ConcreteDataType::Vector(_)
| &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
| &ConcreteDataType::List(_)
| &ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu {
data_type: origin,
reason: "not implemented",
}
@@ -481,6 +482,11 @@ pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
}
.fail(),
&ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
&ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu {
data_type: origin,
reason: "not implemented",
}
.fail(),
}
}

View File

@@ -308,6 +308,7 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu
ConcreteDataType::Duration(_)
| ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Struct(_)
| ConcreteDataType::Dictionary(_) => error::ConcreteTypeNotSupportedSnafu {
t: data_type.clone(),
}