mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
feat: convert values to vector directly (#1704)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -16,7 +16,6 @@ use std::collections::HashMap;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::{Column, DeleteRequest as GrpcDeleteRequest};
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use table::requests::DeleteRequest;
|
||||
@@ -41,14 +40,11 @@ pub fn to_table_delete_request(request: GrpcDeleteRequest) -> Result<DeleteReque
|
||||
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype)
|
||||
.context(ColumnDataTypeSnafu)?
|
||||
.into();
|
||||
|
||||
let vector_builder = &mut datatype.create_mutable_vector(row_count);
|
||||
|
||||
add_values_to_builder(vector_builder, values, row_count, null_mask)?;
|
||||
let vector = add_values_to_builder(datatype, values, row_count, null_mask)?;
|
||||
|
||||
ensure!(
|
||||
key_column_values
|
||||
.insert(column_name.clone(), vector_builder.to_vector())
|
||||
.insert(column_name.clone(), vector)
|
||||
.is_none(),
|
||||
IllegalDeleteRequestSnafu {
|
||||
reason: format!("Duplicated column '{column_name}' in delete request.")
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::column::{SemanticType, Values};
|
||||
@@ -25,10 +26,16 @@ use common_time::timestamp::Timestamp;
|
||||
use common_time::{Date, DateTime};
|
||||
use datatypes::data_type::{ConcreteDataType, DataType};
|
||||
use datatypes::prelude::{ValueRef, VectorRef};
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use datatypes::types::TimestampType;
|
||||
use datatypes::types::{Int16Type, Int8Type, TimestampType, UInt16Type, UInt8Type};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::MutableVector;
|
||||
use datatypes::vectors::{
|
||||
BinaryVector, BooleanVector, DateTimeVector, DateVector, Float32Vector, Float64Vector,
|
||||
Int32Vector, Int64Vector, PrimitiveVector, StringVector, TimestampMicrosecondVector,
|
||||
TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector,
|
||||
UInt64Vector,
|
||||
};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::requests::InsertRequest;
|
||||
@@ -287,15 +294,10 @@ pub fn to_table_insert_request(
|
||||
let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype)
|
||||
.context(ColumnDataTypeSnafu)?
|
||||
.into();
|
||||
|
||||
let vector_builder = &mut datatype.create_mutable_vector(row_count);
|
||||
|
||||
add_values_to_builder(vector_builder, values, row_count, null_mask)?;
|
||||
let vector = add_values_to_builder(datatype, values, row_count, null_mask)?;
|
||||
|
||||
ensure!(
|
||||
columns_values
|
||||
.insert(column_name.clone(), vector_builder.to_vector())
|
||||
.is_none(),
|
||||
columns_values.insert(column_name.clone(), vector).is_none(),
|
||||
ColumnAlreadyExistsSnafu {
|
||||
column: column_name
|
||||
}
|
||||
@@ -312,28 +314,16 @@ pub fn to_table_insert_request(
|
||||
}
|
||||
|
||||
pub(crate) fn add_values_to_builder(
|
||||
builder: &mut Box<dyn MutableVector>,
|
||||
data_type: ConcreteDataType,
|
||||
values: Values,
|
||||
row_count: usize,
|
||||
null_mask: Vec<u8>,
|
||||
) -> Result<()> {
|
||||
let data_type = builder.data_type();
|
||||
let values = convert_values(&data_type, values);
|
||||
|
||||
) -> Result<VectorRef> {
|
||||
if null_mask.is_empty() {
|
||||
ensure!(
|
||||
values.len() == row_count,
|
||||
UnexpectedValuesLengthSnafu {
|
||||
reason: "If null_mask is empty, the length of values must be equal to row_count."
|
||||
}
|
||||
);
|
||||
|
||||
values.iter().try_for_each(|value| {
|
||||
builder
|
||||
.try_push_value_ref(value.as_value_ref())
|
||||
.context(CreateVectorSnafu)
|
||||
})?;
|
||||
Ok(values_to_vector(&data_type, values))
|
||||
} else {
|
||||
let builder = &mut data_type.create_mutable_vector(row_count);
|
||||
let values = convert_values(&data_type, values);
|
||||
let null_mask = BitVec::from_vec(null_mask);
|
||||
ensure!(
|
||||
null_mask.count_ones() + values.len() == row_count,
|
||||
@@ -354,8 +344,53 @@ pub(crate) fn add_values_to_builder(
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(builder.to_vector())
|
||||
}
|
||||
}
|
||||
|
||||
fn values_to_vector(data_type: &ConcreteDataType, values: Values) -> VectorRef {
|
||||
match data_type {
|
||||
ConcreteDataType::Boolean(_) => Arc::new(BooleanVector::from(values.bool_values)),
|
||||
ConcreteDataType::Int8(_) => Arc::new(PrimitiveVector::<Int8Type>::from_iter_values(
|
||||
values.i8_values.into_iter().map(|x| x as i8),
|
||||
)),
|
||||
ConcreteDataType::Int16(_) => Arc::new(PrimitiveVector::<Int16Type>::from_iter_values(
|
||||
values.i16_values.into_iter().map(|x| x as i16),
|
||||
)),
|
||||
ConcreteDataType::Int32(_) => Arc::new(Int32Vector::from_vec(values.i32_values)),
|
||||
ConcreteDataType::Int64(_) => Arc::new(Int64Vector::from_vec(values.i64_values)),
|
||||
ConcreteDataType::UInt8(_) => Arc::new(PrimitiveVector::<UInt8Type>::from_iter_values(
|
||||
values.u8_values.into_iter().map(|x| x as u8),
|
||||
)),
|
||||
ConcreteDataType::UInt16(_) => Arc::new(PrimitiveVector::<UInt16Type>::from_iter_values(
|
||||
values.u16_values.into_iter().map(|x| x as u16),
|
||||
)),
|
||||
ConcreteDataType::UInt32(_) => Arc::new(UInt32Vector::from_vec(values.u32_values)),
|
||||
ConcreteDataType::UInt64(_) => Arc::new(UInt64Vector::from_vec(values.u64_values)),
|
||||
ConcreteDataType::Float32(_) => Arc::new(Float32Vector::from_vec(values.f32_values)),
|
||||
ConcreteDataType::Float64(_) => Arc::new(Float64Vector::from_vec(values.f64_values)),
|
||||
ConcreteDataType::Binary(_) => Arc::new(BinaryVector::from(values.binary_values)),
|
||||
ConcreteDataType::String(_) => Arc::new(StringVector::from_vec(values.string_values)),
|
||||
ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)),
|
||||
ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)),
|
||||
ConcreteDataType::Timestamp(unit) => match unit {
|
||||
TimestampType::Second(_) => {
|
||||
Arc::new(TimestampSecondVector::from_vec(values.ts_second_values))
|
||||
}
|
||||
TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec(
|
||||
values.ts_millisecond_values,
|
||||
)),
|
||||
TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec(
|
||||
values.ts_microsecond_values,
|
||||
)),
|
||||
TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec(
|
||||
values.ts_nanosecond_values,
|
||||
)),
|
||||
},
|
||||
ConcreteDataType::Null(_) | ConcreteDataType::List(_) | ConcreteDataType::Dictionary(_) => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
|
||||
|
||||
@@ -113,6 +113,14 @@ impl Vector for BinaryVector {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<Vec<u8>>> for BinaryVector {
|
||||
fn from(data: Vec<Vec<u8>>) -> Self {
|
||||
Self {
|
||||
array: BinaryArray::from_iter_values(data),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarVector for BinaryVector {
|
||||
type OwnedItem = Vec<u8>;
|
||||
type RefItem<'a> = &'a [u8];
|
||||
|
||||
@@ -130,6 +130,12 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_iter_values<I: IntoIterator<Item = T::Native>>(iter: I) -> Self {
|
||||
Self {
|
||||
array: PrimitiveArray::from_iter_values(iter),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_values<I: IntoIterator<Item = T::Native>>(iter: I) -> Self {
|
||||
Self {
|
||||
array: PrimitiveArray::from_iter_values(iter),
|
||||
|
||||
Reference in New Issue
Block a user