mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
refactor(datatypes): enhance MutableVector methods (#987)
* refactor(datatypes): enhance MutableVector methods * refactor(datatypes): address code review issues * refactor(datatypes): address more code review issues * refactor(datatypes): fix merge conflicts * refactor(datatypes): address code review issues * refactor(datatypes): address more code review issues * refactor(datatypes): update sql delete with the newly introduced method
This commit is contained in:
@@ -162,16 +162,10 @@ fn tables_to_record_batch(
|
||||
|
||||
for table_name in table_names {
|
||||
// Safety: All these vectors are string type.
|
||||
catalog_vec
|
||||
.push_value_ref(ValueRef::String(catalog_name))
|
||||
.unwrap();
|
||||
schema_vec
|
||||
.push_value_ref(ValueRef::String(schema_name))
|
||||
.unwrap();
|
||||
table_name_vec
|
||||
.push_value_ref(ValueRef::String(&table_name))
|
||||
.unwrap();
|
||||
engine_vec.push_value_ref(ValueRef::String(engine)).unwrap();
|
||||
catalog_vec.push_value_ref(ValueRef::String(catalog_name));
|
||||
schema_vec.push_value_ref(ValueRef::String(schema_name));
|
||||
table_name_vec.push_value_ref(ValueRef::String(&table_name));
|
||||
engine_vec.push_value_ref(ValueRef::String(engine));
|
||||
}
|
||||
|
||||
vec![
|
||||
|
||||
@@ -96,9 +96,7 @@ pub fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
|
||||
|
||||
for i in 0..rows {
|
||||
if let Some(true) = nulls_iter.next() {
|
||||
vector
|
||||
.push_value_ref(ValueRef::Null)
|
||||
.context(CreateVectorSnafu)?;
|
||||
vector.push_null();
|
||||
} else {
|
||||
let value_ref = values_iter
|
||||
.next()
|
||||
@@ -109,16 +107,12 @@ pub fn column_to_vector(column: &Column, rows: u32) -> Result<VectorRef> {
|
||||
),
|
||||
})?;
|
||||
vector
|
||||
.push_value_ref(value_ref)
|
||||
.try_push_value_ref(value_ref)
|
||||
.context(CreateVectorSnafu)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
(0..rows).try_for_each(|_| {
|
||||
vector
|
||||
.push_value_ref(ValueRef::Null)
|
||||
.context(CreateVectorSnafu)
|
||||
})?;
|
||||
(0..rows).for_each(|_| vector.push_null());
|
||||
}
|
||||
Ok(vector.to_vector())
|
||||
}
|
||||
@@ -324,7 +318,7 @@ fn add_values_to_builder(
|
||||
|
||||
values.iter().try_for_each(|value| {
|
||||
builder
|
||||
.push_value_ref(value.as_value_ref())
|
||||
.try_push_value_ref(value.as_value_ref())
|
||||
.context(CreateVectorSnafu)
|
||||
})?;
|
||||
} else {
|
||||
@@ -337,12 +331,10 @@ fn add_values_to_builder(
|
||||
let mut idx_of_values = 0;
|
||||
for idx in 0..row_count {
|
||||
match is_null(&null_mask, idx) {
|
||||
Some(true) => builder
|
||||
.push_value_ref(ValueRef::Null)
|
||||
.context(CreateVectorSnafu)?,
|
||||
Some(true) => builder.push_null(),
|
||||
_ => {
|
||||
builder
|
||||
.push_value_ref(values[idx_of_values].as_value_ref())
|
||||
.try_push_value_ref(values[idx_of_values].as_value_ref())
|
||||
.context(CreateVectorSnafu)?;
|
||||
idx_of_values += 1
|
||||
}
|
||||
|
||||
@@ -124,7 +124,7 @@ fn value_to_vector(column_name: &String, sql_value: &Value, table: &TableRef) ->
|
||||
match value {
|
||||
Ok(value) => {
|
||||
let mut vec = data_type.create_mutable_vector(1);
|
||||
if vec.push_value_ref(value.as_value_ref()).is_err() {
|
||||
if vec.try_push_value_ref(value.as_value_ref()).is_err() {
|
||||
return InvalidSqlSnafu {
|
||||
msg: format!(
|
||||
"invalid sql, column name is {column_name}, value is {sql_value}",
|
||||
|
||||
@@ -143,7 +143,7 @@ fn add_row_to_vector(
|
||||
statements::sql_value_to_value(&column_schema.name, &column_schema.data_type, sql_val)
|
||||
.context(ParseSqlSnafu)?
|
||||
};
|
||||
builder.push_value_ref(value.as_value_ref()).unwrap();
|
||||
builder.push_value_ref(value.as_value_ref());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -145,8 +145,7 @@ impl ColumnSchema {
|
||||
let value_ref = padding_value.as_value_ref();
|
||||
let mut mutable_vector = self.data_type.create_mutable_vector(num_rows);
|
||||
for _ in 0..num_rows {
|
||||
// Safety: Both the vector and default value are created by the data type.
|
||||
mutable_vector.push_value_ref(value_ref).unwrap();
|
||||
mutable_vector.push_value_ref(value_ref);
|
||||
}
|
||||
mutable_vector.to_vector()
|
||||
}
|
||||
|
||||
@@ -143,7 +143,7 @@ impl ColumnDefaultConstraint {
|
||||
// attempt to downcast the vector fail if they don't check whether the vector is const
|
||||
// first.
|
||||
let mut mutable_vector = data_type.create_mutable_vector(1);
|
||||
mutable_vector.push_value_ref(v.as_value_ref())?;
|
||||
mutable_vector.try_push_value_ref(v.as_value_ref())?;
|
||||
let base_vector = mutable_vector.to_vector();
|
||||
Ok(base_vector.replicate(&[num_rows]))
|
||||
}
|
||||
|
||||
@@ -172,10 +172,25 @@ pub trait MutableVector: Send + Sync {
|
||||
/// Convert `self` to an (immutable) [VectorRef] and reset `self`.
|
||||
fn to_vector(&mut self) -> VectorRef;
|
||||
|
||||
/// Try to push value ref to this mutable vector.
|
||||
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()>;
|
||||
|
||||
/// Push value ref to this mutable vector.
|
||||
///
|
||||
/// Returns error if data types mismatch.
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()>;
|
||||
/// # Panics
|
||||
/// Panics if error if data types mismatch.
|
||||
fn push_value_ref(&mut self, value: ValueRef) {
|
||||
self.try_push_value_ref(value).unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"expecting pushing value of datatype {:?}, actual {:?}",
|
||||
self.data_type(),
|
||||
value
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
// Push null to this mutable vector.
|
||||
fn push_null(&mut self);
|
||||
|
||||
/// Extend this mutable vector by slice of `vector`.
|
||||
///
|
||||
|
||||
@@ -163,7 +163,7 @@ impl MutableVector for BinaryVectorBuilder {
|
||||
Arc::new(self.finish())
|
||||
}
|
||||
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
match value.as_binary()? {
|
||||
Some(v) => self.mutable_array.append_value(v),
|
||||
None => self.mutable_array.append_null(),
|
||||
@@ -174,6 +174,10 @@ impl MutableVector for BinaryVectorBuilder {
|
||||
fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
|
||||
vectors::impl_extend_for_builder!(self, vector, BinaryVector, offset, length)
|
||||
}
|
||||
|
||||
fn push_null(&mut self) {
|
||||
self.mutable_array.append_null()
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarVectorBuilder for BinaryVectorBuilder {
|
||||
@@ -337,10 +341,8 @@ mod tests {
|
||||
let input = BinaryVector::from_slice(&[b"world", b"one", b"two"]);
|
||||
|
||||
let mut builder = BinaryType::default().create_mutable_vector(3);
|
||||
builder
|
||||
.push_value_ref(ValueRef::Binary("hello".as_bytes()))
|
||||
.unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.push_value_ref(ValueRef::Binary("hello".as_bytes()));
|
||||
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.extend_slice_of(&input, 1, 2).unwrap();
|
||||
assert!(builder
|
||||
.extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1)
|
||||
|
||||
@@ -189,7 +189,7 @@ impl MutableVector for BooleanVectorBuilder {
|
||||
Arc::new(self.finish())
|
||||
}
|
||||
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
match value.as_boolean()? {
|
||||
Some(v) => self.mutable_array.append_value(v),
|
||||
None => self.mutable_array.append_null(),
|
||||
@@ -200,6 +200,10 @@ impl MutableVector for BooleanVectorBuilder {
|
||||
fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
|
||||
vectors::impl_extend_for_builder!(self, vector, BooleanVector, offset, length)
|
||||
}
|
||||
|
||||
fn push_null(&mut self) {
|
||||
self.mutable_array.append_null()
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarVectorBuilder for BooleanVectorBuilder {
|
||||
@@ -357,8 +361,8 @@ mod tests {
|
||||
let input = BooleanVector::from_slice(&[true, false, true]);
|
||||
|
||||
let mut builder = BooleanType::default().create_mutable_vector(3);
|
||||
builder.push_value_ref(ValueRef::Boolean(true)).unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.push_value_ref(ValueRef::Boolean(true));
|
||||
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.extend_slice_of(&input, 1, 2).unwrap();
|
||||
assert!(builder
|
||||
.extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1)
|
||||
|
||||
@@ -69,10 +69,8 @@ mod tests {
|
||||
let input = DateVector::from_slice(&[1, 2, 3]);
|
||||
|
||||
let mut builder = DateType::default().create_mutable_vector(3);
|
||||
builder
|
||||
.push_value_ref(ValueRef::Date(Date::new(5)))
|
||||
.unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.push_value_ref(ValueRef::Date(Date::new(5)));
|
||||
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.extend_slice_of(&input, 1, 2).unwrap();
|
||||
assert!(builder
|
||||
.extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1)
|
||||
|
||||
@@ -88,10 +88,8 @@ mod tests {
|
||||
]);
|
||||
|
||||
let mut builder = DateTimeType::default().create_mutable_vector(3);
|
||||
builder
|
||||
.push_value_ref(ValueRef::DateTime(DateTime::new(5)))
|
||||
.unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.push_value_ref(ValueRef::DateTime(DateTime::new(5)));
|
||||
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.extend_slice_of(&input, 1, 2).unwrap();
|
||||
assert!(builder
|
||||
.extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1)
|
||||
|
||||
@@ -258,14 +258,11 @@ impl ListVectorBuilder {
|
||||
self.null_buffer_builder.append(is_valid);
|
||||
}
|
||||
|
||||
fn push_null(&mut self) {
|
||||
self.finish_list(false);
|
||||
}
|
||||
|
||||
fn push_list_value(&mut self, list_value: &ListValue) -> Result<()> {
|
||||
if let Some(items) = list_value.items() {
|
||||
for item in &**items {
|
||||
self.values_builder.push_value_ref(item.as_value_ref())?;
|
||||
self.values_builder
|
||||
.try_push_value_ref(item.as_value_ref())?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -295,7 +292,7 @@ impl MutableVector for ListVectorBuilder {
|
||||
Arc::new(self.finish())
|
||||
}
|
||||
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
if let Some(list_ref) = value.as_list()? {
|
||||
match list_ref {
|
||||
ListValueRef::Indexed { vector, idx } => match vector.get(idx).as_list()? {
|
||||
@@ -314,11 +311,15 @@ impl MutableVector for ListVectorBuilder {
|
||||
fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
|
||||
for idx in offset..offset + length {
|
||||
let value = vector.get_ref(idx);
|
||||
self.push_value_ref(value)?;
|
||||
self.try_push_value_ref(value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn push_null(&mut self) {
|
||||
self.finish_list(false);
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarVectorBuilder for ListVectorBuilder {
|
||||
@@ -332,7 +333,7 @@ impl ScalarVectorBuilder for ListVectorBuilder {
|
||||
// We expect the input ListValue has the same inner type as the builder when using
|
||||
// push(), so just panic if `push_value_ref()` returns error, which indicate an
|
||||
// invalid input value type.
|
||||
self.push_value_ref(value.into()).unwrap_or_else(|e| {
|
||||
self.try_push_value_ref(value.into()).unwrap_or_else(|e| {
|
||||
panic!(
|
||||
"Failed to push value, expect value type {:?}, err:{}",
|
||||
self.item_type, e
|
||||
@@ -653,19 +654,17 @@ pub mod tests {
|
||||
fn test_list_vector_builder() {
|
||||
let mut builder =
|
||||
ListType::new(ConcreteDataType::int32_datatype()).create_mutable_vector(3);
|
||||
builder
|
||||
.push_value_ref(ValueRef::List(ListValueRef::Ref {
|
||||
val: &ListValue::new(
|
||||
Some(Box::new(vec![
|
||||
Value::Int32(4),
|
||||
Value::Null,
|
||||
Value::Int32(6),
|
||||
])),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
),
|
||||
}))
|
||||
.unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.push_value_ref(ValueRef::List(ListValueRef::Ref {
|
||||
val: &ListValue::new(
|
||||
Some(Box::new(vec![
|
||||
Value::Int32(4),
|
||||
Value::Null,
|
||||
Value::Int32(6),
|
||||
])),
|
||||
ConcreteDataType::int32_datatype(),
|
||||
),
|
||||
}));
|
||||
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
|
||||
let data = vec![
|
||||
Some(vec![Some(1), Some(2), Some(3)]),
|
||||
|
||||
@@ -163,7 +163,7 @@ impl MutableVector for NullVectorBuilder {
|
||||
vector
|
||||
}
|
||||
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
ensure!(
|
||||
value.is_null(),
|
||||
error::CastTypeSnafu {
|
||||
@@ -196,6 +196,10 @@ impl MutableVector for NullVectorBuilder {
|
||||
self.length += length;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn push_null(&mut self) {
|
||||
self.length += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replicate_null(vector: &NullVector, offsets: &[usize]) -> VectorRef {
|
||||
@@ -266,8 +270,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_null_vector_builder() {
|
||||
let mut builder = NullType::default().create_mutable_vector(3);
|
||||
builder.push_value_ref(ValueRef::Null).unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.push_null();
|
||||
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
|
||||
let input = NullVector::new(3);
|
||||
builder.extend_slice_of(&input, 1, 2).unwrap();
|
||||
|
||||
@@ -308,7 +308,7 @@ impl<T: LogicalPrimitiveType> MutableVector for PrimitiveVectorBuilder<T> {
|
||||
Arc::new(self.finish())
|
||||
}
|
||||
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
let primitive = T::cast_value_ref(value)?;
|
||||
match primitive {
|
||||
Some(v) => self.mutable_array.append_value(v.into_native()),
|
||||
@@ -326,6 +326,10 @@ impl<T: LogicalPrimitiveType> MutableVector for PrimitiveVectorBuilder<T> {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn push_null(&mut self) {
|
||||
self.mutable_array.append_null()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ScalarVectorBuilder for PrimitiveVectorBuilder<T>
|
||||
@@ -511,8 +515,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_primitive_vector_builder() {
|
||||
let mut builder = Int64Type::default().create_mutable_vector(3);
|
||||
builder.push_value_ref(ValueRef::Int64(123)).unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.push_value_ref(ValueRef::Int64(123));
|
||||
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
|
||||
let input = Int64Vector::from_slice(&[7, 8, 9]);
|
||||
builder.extend_slice_of(&input, 1, 2).unwrap();
|
||||
|
||||
@@ -203,7 +203,7 @@ impl MutableVector for StringVectorBuilder {
|
||||
Arc::new(self.finish())
|
||||
}
|
||||
|
||||
fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
|
||||
match value.as_string()? {
|
||||
Some(v) => self.mutable_array.append_value(v),
|
||||
None => self.mutable_array.append_null(),
|
||||
@@ -214,6 +214,10 @@ impl MutableVector for StringVectorBuilder {
|
||||
fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
|
||||
vectors::impl_extend_for_builder!(self, vector, StringVector, offset, length)
|
||||
}
|
||||
|
||||
fn push_null(&mut self) {
|
||||
self.mutable_array.append_null()
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarVectorBuilder for StringVectorBuilder {
|
||||
@@ -285,8 +289,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_string_vector_builder() {
|
||||
let mut builder = StringVectorBuilder::with_capacity(3);
|
||||
builder.push_value_ref(ValueRef::String("hello")).unwrap();
|
||||
assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
builder.push_value_ref(ValueRef::String("hello"));
|
||||
assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err());
|
||||
|
||||
let input = StringVector::from_slice(&["world", "one", "two"]);
|
||||
builder.extend_slice_of(&input, 1, 2).unwrap();
|
||||
|
||||
@@ -118,7 +118,7 @@ fn add_row_to_vector(
|
||||
statements::sql_value_to_value(&column_schema.name, &column_schema.data_type, sql_val)
|
||||
.context(error::ParseSqlSnafu)?
|
||||
};
|
||||
builder.push_value_ref(value.as_value_ref()).unwrap();
|
||||
builder.push_value_ref(value.as_value_ref());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ impl ChunkReader for MockChunkReader {
|
||||
let data = self.memtable.get(&column_schema.name).unwrap();
|
||||
let mut builder = column_schema.data_type.create_mutable_vector(data.len());
|
||||
for v in data {
|
||||
builder.push_value_ref(v.as_value_ref()).unwrap();
|
||||
builder.push_value_ref(v.as_value_ref());
|
||||
}
|
||||
builder.to_vector()
|
||||
})
|
||||
|
||||
@@ -143,9 +143,7 @@ fn split_insert_request(
|
||||
.or_insert_with(|| vector.data_type().create_mutable_vector(row_num));
|
||||
val_idxs.iter().for_each(|idx| {
|
||||
// Safety: MutableVector is built according to column data type.
|
||||
builder
|
||||
.push_value_ref(vector.get(*idx).as_value_ref())
|
||||
.unwrap();
|
||||
builder.push_value_ref(vector.get(*idx).as_value_ref());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,7 +159,7 @@ impl PyVector {
|
||||
)));
|
||||
};
|
||||
// Safety: `pyobj_try_to_typed_val()` has checked the data type.
|
||||
buf.push_value_ref(val.as_value_ref()).unwrap();
|
||||
buf.push_value_ref(val.as_value_ref());
|
||||
}
|
||||
|
||||
Ok(PyVector {
|
||||
@@ -589,14 +589,14 @@ impl PyVector {
|
||||
// Negative step require special treatment
|
||||
for i in range.rev().step_by(step.unsigned_abs()) {
|
||||
// Safety: This mutable vector is created from the vector's data type.
|
||||
buf.push_value_ref(vector.get_ref(i)).unwrap();
|
||||
buf.push_value_ref(vector.get_ref(i));
|
||||
}
|
||||
let v: PyVector = buf.to_vector().into();
|
||||
Ok(v.into_pyobject(vm))
|
||||
} else {
|
||||
for i in range.step_by(step.unsigned_abs()) {
|
||||
// Safety: This mutable vector is created from the vector's data type.
|
||||
buf.push_value_ref(vector.get_ref(i)).unwrap();
|
||||
buf.push_value_ref(vector.get_ref(i));
|
||||
}
|
||||
let v: PyVector = buf.to_vector().into();
|
||||
Ok(v.into_pyobject(vm))
|
||||
|
||||
@@ -45,12 +45,6 @@ pub enum Error {
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert vector, source: {}", source))]
|
||||
VectorConversion {
|
||||
#[snafu(backtrace)]
|
||||
source: datatypes::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to collect recordbatch, source: {}", source))]
|
||||
CollectRecordbatch {
|
||||
#[snafu(backtrace)]
|
||||
@@ -271,7 +265,6 @@ impl ErrorExt for Error {
|
||||
Internal { .. }
|
||||
| InternalIo { .. }
|
||||
| TokioIo { .. }
|
||||
| VectorConversion { .. }
|
||||
| CollectRecordbatch { .. }
|
||||
| StartHttp { .. }
|
||||
| StartGrpc { .. }
|
||||
|
||||
@@ -21,13 +21,10 @@ use common_time::Timestamp;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::types::{TimestampMillisecondType, TimestampType};
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, VectorRef};
|
||||
use snafu::ResultExt;
|
||||
use table::requests::InsertRequest;
|
||||
|
||||
use crate::error::VectorConversionSnafu;
|
||||
|
||||
type ColumnLen = usize;
|
||||
type ColumnName = String;
|
||||
|
||||
@@ -110,10 +107,7 @@ impl LineWriter {
|
||||
let or_insert = || {
|
||||
let rows = self.current_rows;
|
||||
let mut builder = datatype.create_mutable_vector(self.expected_rows);
|
||||
(0..rows)
|
||||
.try_for_each(|_| builder.push_value_ref(ValueRef::Null))
|
||||
.context(VectorConversionSnafu)
|
||||
.unwrap();
|
||||
(0..rows).for_each(|_| builder.push_null());
|
||||
(builder, rows)
|
||||
};
|
||||
let (builder, column_len) = self
|
||||
@@ -121,7 +115,7 @@ impl LineWriter {
|
||||
.entry(column_name.to_string())
|
||||
.or_insert_with(or_insert);
|
||||
|
||||
builder.push_value_ref(value.as_value_ref()).unwrap();
|
||||
builder.push_value_ref(value.as_value_ref());
|
||||
*column_len += 1;
|
||||
}
|
||||
|
||||
@@ -129,15 +123,11 @@ impl LineWriter {
|
||||
self.current_rows += 1;
|
||||
self.columns_builders
|
||||
.values_mut()
|
||||
.try_for_each(|(builder, len)| {
|
||||
.for_each(|(builder, len)| {
|
||||
if self.current_rows > *len {
|
||||
builder.push_value_ref(ValueRef::Null)
|
||||
} else {
|
||||
Ok(())
|
||||
builder.push_null()
|
||||
}
|
||||
})
|
||||
.context(VectorConversionSnafu)
|
||||
.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
pub fn finish(self) -> InsertRequest {
|
||||
|
||||
@@ -452,10 +452,7 @@ fn rows_to_vectors<I: Iterator<Item = ConcreteDataType>, T: RowsProvider>(
|
||||
for row_idx in 0..row_num {
|
||||
let row = provider.row_by_index(row_idx);
|
||||
let value = &row[col_idx];
|
||||
builder
|
||||
.as_mut()
|
||||
.push_value_ref(value.as_value_ref())
|
||||
.unwrap();
|
||||
builder.as_mut().push_value_ref(value.as_value_ref());
|
||||
}
|
||||
|
||||
vectors.push(builder.to_vector());
|
||||
|
||||
@@ -210,7 +210,7 @@ impl BatchBuilder {
|
||||
for (builder, column) in self.builders.iter_mut().zip(batch.columns()) {
|
||||
let value = column.get_ref(i);
|
||||
builder
|
||||
.push_value_ref(value)
|
||||
.try_push_value_ref(value)
|
||||
.context(error::PushBatchSnafu)?;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user