From 7787cfdd425bcba123aa80387f87a86025e9030c Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Fri, 17 Feb 2023 08:16:23 +0000 Subject: [PATCH] refactor(datatypes): enhance MutableVector methods (#987) * refactor(datatypes): enhance MutableVector methods * refactor(datatypes): address code review issues * refactor(datatypes): address more code review issues * refactor(datatypes): fix merge conflicts * refactor(datatypes): address code review issues * refactor(datatypes): address more code review issues * refactor(datatypes): update sql delete with the newly introduced method --- src/catalog/src/tables.rs | 14 ++----- src/common/grpc-expr/src/insert.rs | 20 +++------- src/datanode/src/sql/delete.rs | 2 +- src/datanode/src/sql/insert.rs | 2 +- src/datatypes/src/schema/column_schema.rs | 3 +- src/datatypes/src/schema/constraint.rs | 2 +- src/datatypes/src/vectors.rs | 19 +++++++++- src/datatypes/src/vectors/binary.rs | 12 +++--- src/datatypes/src/vectors/boolean.rs | 10 +++-- src/datatypes/src/vectors/date.rs | 6 +-- src/datatypes/src/vectors/datetime.rs | 6 +-- src/datatypes/src/vectors/list.rs | 41 ++++++++++----------- src/datatypes/src/vectors/null.rs | 10 +++-- src/datatypes/src/vectors/primitive.rs | 10 +++-- src/datatypes/src/vectors/string.rs | 10 +++-- src/frontend/src/sql.rs | 2 +- src/mito/src/table/test_util/mock_engine.rs | 2 +- src/partition/src/splitter.rs | 4 +- src/script/src/python/vector.rs | 6 +-- src/servers/src/error.rs | 7 ---- src/servers/src/line_writer.rs | 22 +++-------- src/storage/src/memtable/btree.rs | 5 +-- src/storage/src/read.rs | 2 +- 23 files changed, 104 insertions(+), 113 deletions(-) diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index fa3f851675..e2b4a60c03 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -162,16 +162,10 @@ fn tables_to_record_batch( for table_name in table_names { // Safety: All these vectors are string type. - catalog_vec - .push_value_ref(ValueRef::String(catalog_name)) - .unwrap(); - schema_vec - .push_value_ref(ValueRef::String(schema_name)) - .unwrap(); - table_name_vec - .push_value_ref(ValueRef::String(&table_name)) - .unwrap(); - engine_vec.push_value_ref(ValueRef::String(engine)).unwrap(); + catalog_vec.push_value_ref(ValueRef::String(catalog_name)); + schema_vec.push_value_ref(ValueRef::String(schema_name)); + table_name_vec.push_value_ref(ValueRef::String(&table_name)); + engine_vec.push_value_ref(ValueRef::String(engine)); } vec![ diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 9d4345d839..6053b8f996 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -96,9 +96,7 @@ pub fn column_to_vector(column: &Column, rows: u32) -> Result { for i in 0..rows { if let Some(true) = nulls_iter.next() { - vector - .push_value_ref(ValueRef::Null) - .context(CreateVectorSnafu)?; + vector.push_null(); } else { let value_ref = values_iter .next() @@ -109,16 +107,12 @@ pub fn column_to_vector(column: &Column, rows: u32) -> Result { ), })?; vector - .push_value_ref(value_ref) + .try_push_value_ref(value_ref) .context(CreateVectorSnafu)?; } } } else { - (0..rows).try_for_each(|_| { - vector - .push_value_ref(ValueRef::Null) - .context(CreateVectorSnafu) - })?; + (0..rows).for_each(|_| vector.push_null()); } Ok(vector.to_vector()) } @@ -324,7 +318,7 @@ fn add_values_to_builder( values.iter().try_for_each(|value| { builder - .push_value_ref(value.as_value_ref()) + .try_push_value_ref(value.as_value_ref()) .context(CreateVectorSnafu) })?; } else { @@ -337,12 +331,10 @@ fn add_values_to_builder( let mut idx_of_values = 0; for idx in 0..row_count { match is_null(&null_mask, idx) { - Some(true) => builder - .push_value_ref(ValueRef::Null) - .context(CreateVectorSnafu)?, + Some(true) => builder.push_null(), _ => { builder - .push_value_ref(values[idx_of_values].as_value_ref()) + .try_push_value_ref(values[idx_of_values].as_value_ref()) .context(CreateVectorSnafu)?; idx_of_values += 1 } diff --git a/src/datanode/src/sql/delete.rs b/src/datanode/src/sql/delete.rs index 0308c4f8e5..4e3c0015c0 100644 --- a/src/datanode/src/sql/delete.rs +++ b/src/datanode/src/sql/delete.rs @@ -124,7 +124,7 @@ fn value_to_vector(column_name: &String, sql_value: &Value, table: &TableRef) -> match value { Ok(value) => { let mut vec = data_type.create_mutable_vector(1); - if vec.push_value_ref(value.as_value_ref()).is_err() { + if vec.try_push_value_ref(value.as_value_ref()).is_err() { return InvalidSqlSnafu { msg: format!( "invalid sql, column name is {column_name}, value is {sql_value}", diff --git a/src/datanode/src/sql/insert.rs b/src/datanode/src/sql/insert.rs index 7ca7137a59..7b739bd260 100644 --- a/src/datanode/src/sql/insert.rs +++ b/src/datanode/src/sql/insert.rs @@ -143,7 +143,7 @@ fn add_row_to_vector( statements::sql_value_to_value(&column_schema.name, &column_schema.data_type, sql_val) .context(ParseSqlSnafu)? }; - builder.push_value_ref(value.as_value_ref()).unwrap(); + builder.push_value_ref(value.as_value_ref()); Ok(()) } diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 150ccc3420..02a8ad0093 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -145,8 +145,7 @@ impl ColumnSchema { let value_ref = padding_value.as_value_ref(); let mut mutable_vector = self.data_type.create_mutable_vector(num_rows); for _ in 0..num_rows { - // Safety: Both the vector and default value are created by the data type. - mutable_vector.push_value_ref(value_ref).unwrap(); + mutable_vector.push_value_ref(value_ref); } mutable_vector.to_vector() } diff --git a/src/datatypes/src/schema/constraint.rs b/src/datatypes/src/schema/constraint.rs index 9c9557a739..63a1a3e078 100644 --- a/src/datatypes/src/schema/constraint.rs +++ b/src/datatypes/src/schema/constraint.rs @@ -143,7 +143,7 @@ impl ColumnDefaultConstraint { // attempt to downcast the vector fail if they don't check whether the vector is const // first. let mut mutable_vector = data_type.create_mutable_vector(1); - mutable_vector.push_value_ref(v.as_value_ref())?; + mutable_vector.try_push_value_ref(v.as_value_ref())?; let base_vector = mutable_vector.to_vector(); Ok(base_vector.replicate(&[num_rows])) } diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index 98dbcc8af9..6987eb877c 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -172,10 +172,25 @@ pub trait MutableVector: Send + Sync { /// Convert `self` to an (immutable) [VectorRef] and reset `self`. fn to_vector(&mut self) -> VectorRef; + /// Try to push value ref to this mutable vector. + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()>; + /// Push value ref to this mutable vector. /// - /// Returns error if data types mismatch. - fn push_value_ref(&mut self, value: ValueRef) -> Result<()>; + /// # Panics + /// Panics if error if data types mismatch. + fn push_value_ref(&mut self, value: ValueRef) { + self.try_push_value_ref(value).unwrap_or_else(|_| { + panic!( + "expecting pushing value of datatype {:?}, actual {:?}", + self.data_type(), + value + ); + }); + } + + // Push null to this mutable vector. + fn push_null(&mut self); /// Extend this mutable vector by slice of `vector`. /// diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 53d5b1d3e8..5d7dc6c5f2 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -163,7 +163,7 @@ impl MutableVector for BinaryVectorBuilder { Arc::new(self.finish()) } - fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { match value.as_binary()? { Some(v) => self.mutable_array.append_value(v), None => self.mutable_array.append_null(), @@ -174,6 +174,10 @@ impl MutableVector for BinaryVectorBuilder { fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { vectors::impl_extend_for_builder!(self, vector, BinaryVector, offset, length) } + + fn push_null(&mut self) { + self.mutable_array.append_null() + } } impl ScalarVectorBuilder for BinaryVectorBuilder { @@ -337,10 +341,8 @@ mod tests { let input = BinaryVector::from_slice(&[b"world", b"one", b"two"]); let mut builder = BinaryType::default().create_mutable_vector(3); - builder - .push_value_ref(ValueRef::Binary("hello".as_bytes())) - .unwrap(); - assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.push_value_ref(ValueRef::Binary("hello".as_bytes())); + assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); builder.extend_slice_of(&input, 1, 2).unwrap(); assert!(builder .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index 788b4de420..f9ad873fa3 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -189,7 +189,7 @@ impl MutableVector for BooleanVectorBuilder { Arc::new(self.finish()) } - fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { match value.as_boolean()? { Some(v) => self.mutable_array.append_value(v), None => self.mutable_array.append_null(), @@ -200,6 +200,10 @@ impl MutableVector for BooleanVectorBuilder { fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { vectors::impl_extend_for_builder!(self, vector, BooleanVector, offset, length) } + + fn push_null(&mut self) { + self.mutable_array.append_null() + } } impl ScalarVectorBuilder for BooleanVectorBuilder { @@ -357,8 +361,8 @@ mod tests { let input = BooleanVector::from_slice(&[true, false, true]); let mut builder = BooleanType::default().create_mutable_vector(3); - builder.push_value_ref(ValueRef::Boolean(true)).unwrap(); - assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.push_value_ref(ValueRef::Boolean(true)); + assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); builder.extend_slice_of(&input, 1, 2).unwrap(); assert!(builder .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) diff --git a/src/datatypes/src/vectors/date.rs b/src/datatypes/src/vectors/date.rs index 22c60fbae9..5118c6d8a5 100644 --- a/src/datatypes/src/vectors/date.rs +++ b/src/datatypes/src/vectors/date.rs @@ -69,10 +69,8 @@ mod tests { let input = DateVector::from_slice(&[1, 2, 3]); let mut builder = DateType::default().create_mutable_vector(3); - builder - .push_value_ref(ValueRef::Date(Date::new(5))) - .unwrap(); - assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.push_value_ref(ValueRef::Date(Date::new(5))); + assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); builder.extend_slice_of(&input, 1, 2).unwrap(); assert!(builder .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) diff --git a/src/datatypes/src/vectors/datetime.rs b/src/datatypes/src/vectors/datetime.rs index dae8557096..18bec2c5a6 100644 --- a/src/datatypes/src/vectors/datetime.rs +++ b/src/datatypes/src/vectors/datetime.rs @@ -88,10 +88,8 @@ mod tests { ]); let mut builder = DateTimeType::default().create_mutable_vector(3); - builder - .push_value_ref(ValueRef::DateTime(DateTime::new(5))) - .unwrap(); - assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.push_value_ref(ValueRef::DateTime(DateTime::new(5))); + assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); builder.extend_slice_of(&input, 1, 2).unwrap(); assert!(builder .extend_slice_of(&crate::vectors::Int32Vector::from_slice(&[13]), 0, 1) diff --git a/src/datatypes/src/vectors/list.rs b/src/datatypes/src/vectors/list.rs index 8f9c6ef8e5..7073049fc9 100644 --- a/src/datatypes/src/vectors/list.rs +++ b/src/datatypes/src/vectors/list.rs @@ -258,14 +258,11 @@ impl ListVectorBuilder { self.null_buffer_builder.append(is_valid); } - fn push_null(&mut self) { - self.finish_list(false); - } - fn push_list_value(&mut self, list_value: &ListValue) -> Result<()> { if let Some(items) = list_value.items() { for item in &**items { - self.values_builder.push_value_ref(item.as_value_ref())?; + self.values_builder + .try_push_value_ref(item.as_value_ref())?; } } @@ -295,7 +292,7 @@ impl MutableVector for ListVectorBuilder { Arc::new(self.finish()) } - fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { if let Some(list_ref) = value.as_list()? { match list_ref { ListValueRef::Indexed { vector, idx } => match vector.get(idx).as_list()? { @@ -314,11 +311,15 @@ impl MutableVector for ListVectorBuilder { fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { for idx in offset..offset + length { let value = vector.get_ref(idx); - self.push_value_ref(value)?; + self.try_push_value_ref(value)?; } Ok(()) } + + fn push_null(&mut self) { + self.finish_list(false); + } } impl ScalarVectorBuilder for ListVectorBuilder { @@ -332,7 +333,7 @@ impl ScalarVectorBuilder for ListVectorBuilder { // We expect the input ListValue has the same inner type as the builder when using // push(), so just panic if `push_value_ref()` returns error, which indicate an // invalid input value type. - self.push_value_ref(value.into()).unwrap_or_else(|e| { + self.try_push_value_ref(value.into()).unwrap_or_else(|e| { panic!( "Failed to push value, expect value type {:?}, err:{}", self.item_type, e @@ -653,19 +654,17 @@ pub mod tests { fn test_list_vector_builder() { let mut builder = ListType::new(ConcreteDataType::int32_datatype()).create_mutable_vector(3); - builder - .push_value_ref(ValueRef::List(ListValueRef::Ref { - val: &ListValue::new( - Some(Box::new(vec![ - Value::Int32(4), - Value::Null, - Value::Int32(6), - ])), - ConcreteDataType::int32_datatype(), - ), - })) - .unwrap(); - assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.push_value_ref(ValueRef::List(ListValueRef::Ref { + val: &ListValue::new( + Some(Box::new(vec![ + Value::Int32(4), + Value::Null, + Value::Int32(6), + ])), + ConcreteDataType::int32_datatype(), + ), + })); + assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); let data = vec![ Some(vec![Some(1), Some(2), Some(3)]), diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs index e754335bdc..d1c4090c92 100644 --- a/src/datatypes/src/vectors/null.rs +++ b/src/datatypes/src/vectors/null.rs @@ -163,7 +163,7 @@ impl MutableVector for NullVectorBuilder { vector } - fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { ensure!( value.is_null(), error::CastTypeSnafu { @@ -196,6 +196,10 @@ impl MutableVector for NullVectorBuilder { self.length += length; Ok(()) } + + fn push_null(&mut self) { + self.length += 1; + } } pub(crate) fn replicate_null(vector: &NullVector, offsets: &[usize]) -> VectorRef { @@ -266,8 +270,8 @@ mod tests { #[test] fn test_null_vector_builder() { let mut builder = NullType::default().create_mutable_vector(3); - builder.push_value_ref(ValueRef::Null).unwrap(); - assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.push_null(); + assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); let input = NullVector::new(3); builder.extend_slice_of(&input, 1, 2).unwrap(); diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index d797cf2d2b..f0a21605f0 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -308,7 +308,7 @@ impl MutableVector for PrimitiveVectorBuilder { Arc::new(self.finish()) } - fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { let primitive = T::cast_value_ref(value)?; match primitive { Some(v) => self.mutable_array.append_value(v.into_native()), @@ -326,6 +326,10 @@ impl MutableVector for PrimitiveVectorBuilder { } Ok(()) } + + fn push_null(&mut self) { + self.mutable_array.append_null() + } } impl ScalarVectorBuilder for PrimitiveVectorBuilder @@ -511,8 +515,8 @@ mod tests { #[test] fn test_primitive_vector_builder() { let mut builder = Int64Type::default().create_mutable_vector(3); - builder.push_value_ref(ValueRef::Int64(123)).unwrap(); - assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.push_value_ref(ValueRef::Int64(123)); + assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); let input = Int64Vector::from_slice(&[7, 8, 9]); builder.extend_slice_of(&input, 1, 2).unwrap(); diff --git a/src/datatypes/src/vectors/string.rs b/src/datatypes/src/vectors/string.rs index 3a7bf7cb11..dced05199d 100644 --- a/src/datatypes/src/vectors/string.rs +++ b/src/datatypes/src/vectors/string.rs @@ -203,7 +203,7 @@ impl MutableVector for StringVectorBuilder { Arc::new(self.finish()) } - fn push_value_ref(&mut self, value: ValueRef) -> Result<()> { + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { match value.as_string()? { Some(v) => self.mutable_array.append_value(v), None => self.mutable_array.append_null(), @@ -214,6 +214,10 @@ impl MutableVector for StringVectorBuilder { fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> { vectors::impl_extend_for_builder!(self, vector, StringVector, offset, length) } + + fn push_null(&mut self) { + self.mutable_array.append_null() + } } impl ScalarVectorBuilder for StringVectorBuilder { @@ -285,8 +289,8 @@ mod tests { #[test] fn test_string_vector_builder() { let mut builder = StringVectorBuilder::with_capacity(3); - builder.push_value_ref(ValueRef::String("hello")).unwrap(); - assert!(builder.push_value_ref(ValueRef::Int32(123)).is_err()); + builder.push_value_ref(ValueRef::String("hello")); + assert!(builder.try_push_value_ref(ValueRef::Int32(123)).is_err()); let input = StringVector::from_slice(&["world", "one", "two"]); builder.extend_slice_of(&input, 1, 2).unwrap(); diff --git a/src/frontend/src/sql.rs b/src/frontend/src/sql.rs index 6ce9d60dd7..d1f74e0221 100644 --- a/src/frontend/src/sql.rs +++ b/src/frontend/src/sql.rs @@ -118,7 +118,7 @@ fn add_row_to_vector( statements::sql_value_to_value(&column_schema.name, &column_schema.data_type, sql_val) .context(error::ParseSqlSnafu)? }; - builder.push_value_ref(value.as_value_ref()).unwrap(); + builder.push_value_ref(value.as_value_ref()); Ok(()) } diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index af83d927e8..7725a955a9 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -60,7 +60,7 @@ impl ChunkReader for MockChunkReader { let data = self.memtable.get(&column_schema.name).unwrap(); let mut builder = column_schema.data_type.create_mutable_vector(data.len()); for v in data { - builder.push_value_ref(v.as_value_ref()).unwrap(); + builder.push_value_ref(v.as_value_ref()); } builder.to_vector() }) diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index c8a83098c1..6b7bc05561 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -143,9 +143,7 @@ fn split_insert_request( .or_insert_with(|| vector.data_type().create_mutable_vector(row_num)); val_idxs.iter().for_each(|idx| { // Safety: MutableVector is built according to column data type. - builder - .push_value_ref(vector.get(*idx).as_value_ref()) - .unwrap(); + builder.push_value_ref(vector.get(*idx).as_value_ref()); }); } } diff --git a/src/script/src/python/vector.rs b/src/script/src/python/vector.rs index 24c7aa6976..0c12dbe447 100644 --- a/src/script/src/python/vector.rs +++ b/src/script/src/python/vector.rs @@ -159,7 +159,7 @@ impl PyVector { ))); }; // Safety: `pyobj_try_to_typed_val()` has checked the data type. - buf.push_value_ref(val.as_value_ref()).unwrap(); + buf.push_value_ref(val.as_value_ref()); } Ok(PyVector { @@ -589,14 +589,14 @@ impl PyVector { // Negative step require special treatment for i in range.rev().step_by(step.unsigned_abs()) { // Safety: This mutable vector is created from the vector's data type. - buf.push_value_ref(vector.get_ref(i)).unwrap(); + buf.push_value_ref(vector.get_ref(i)); } let v: PyVector = buf.to_vector().into(); Ok(v.into_pyobject(vm)) } else { for i in range.step_by(step.unsigned_abs()) { // Safety: This mutable vector is created from the vector's data type. - buf.push_value_ref(vector.get_ref(i)).unwrap(); + buf.push_value_ref(vector.get_ref(i)); } let v: PyVector = buf.to_vector().into(); Ok(v.into_pyobject(vm)) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index de1e1a2f7d..b5325d37a9 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -45,12 +45,6 @@ pub enum Error { source: std::io::Error, }, - #[snafu(display("Failed to convert vector, source: {}", source))] - VectorConversion { - #[snafu(backtrace)] - source: datatypes::error::Error, - }, - #[snafu(display("Failed to collect recordbatch, source: {}", source))] CollectRecordbatch { #[snafu(backtrace)] @@ -271,7 +265,6 @@ impl ErrorExt for Error { Internal { .. } | InternalIo { .. } | TokioIo { .. } - | VectorConversion { .. } | CollectRecordbatch { .. } | StartHttp { .. } | StartGrpc { .. } diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs index 96f59da063..7da1de6bb8 100644 --- a/src/servers/src/line_writer.rs +++ b/src/servers/src/line_writer.rs @@ -21,13 +21,10 @@ use common_time::Timestamp; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::types::{TimestampMillisecondType, TimestampType}; -use datatypes::value::{Value, ValueRef}; +use datatypes::value::Value; use datatypes::vectors::{MutableVector, VectorRef}; -use snafu::ResultExt; use table::requests::InsertRequest; -use crate::error::VectorConversionSnafu; - type ColumnLen = usize; type ColumnName = String; @@ -110,10 +107,7 @@ impl LineWriter { let or_insert = || { let rows = self.current_rows; let mut builder = datatype.create_mutable_vector(self.expected_rows); - (0..rows) - .try_for_each(|_| builder.push_value_ref(ValueRef::Null)) - .context(VectorConversionSnafu) - .unwrap(); + (0..rows).for_each(|_| builder.push_null()); (builder, rows) }; let (builder, column_len) = self @@ -121,7 +115,7 @@ impl LineWriter { .entry(column_name.to_string()) .or_insert_with(or_insert); - builder.push_value_ref(value.as_value_ref()).unwrap(); + builder.push_value_ref(value.as_value_ref()); *column_len += 1; } @@ -129,15 +123,11 @@ impl LineWriter { self.current_rows += 1; self.columns_builders .values_mut() - .try_for_each(|(builder, len)| { + .for_each(|(builder, len)| { if self.current_rows > *len { - builder.push_value_ref(ValueRef::Null) - } else { - Ok(()) + builder.push_null() } - }) - .context(VectorConversionSnafu) - .unwrap(); + }); } pub fn finish(self) -> InsertRequest { diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index bb7a57a2df..153aaf77c9 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -452,10 +452,7 @@ fn rows_to_vectors, T: RowsProvider>( for row_idx in 0..row_num { let row = provider.row_by_index(row_idx); let value = &row[col_idx]; - builder - .as_mut() - .push_value_ref(value.as_value_ref()) - .unwrap(); + builder.as_mut().push_value_ref(value.as_value_ref()); } vectors.push(builder.to_vector()); diff --git a/src/storage/src/read.rs b/src/storage/src/read.rs index ebc1f3fe6e..7a84223a32 100644 --- a/src/storage/src/read.rs +++ b/src/storage/src/read.rs @@ -210,7 +210,7 @@ impl BatchBuilder { for (builder, column) in self.builders.iter_mut().zip(batch.columns()) { let value = column.get_ref(i); builder - .push_value_ref(value) + .try_push_value_ref(value) .context(error::PushBatchSnafu)?; }