mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 16:30:39 +00:00
chore: change binary array type from LargeBinaryArray to BinaryArray (#3924)
* chore: change binary array type from LargeBinaryArray to BinaryArray * fix: adjust try_into_vector logic * fix: apply CR suggestions, add tests * chore: fix failing test * chore: fix integration test * chore: adjust the assertions according to changed implementation * chore: add a test with LargeBinary type * chore: apply CR suggestions * chore: simplify tests
This commit is contained in:
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub type BinaryArray = arrow::array::LargeBinaryArray;
|
||||
pub type MutableBinaryArray = arrow::array::LargeBinaryBuilder;
|
||||
pub type BinaryArray = arrow::array::BinaryArray;
|
||||
pub type MutableBinaryArray = arrow::array::BinaryBuilder;
|
||||
pub type StringArray = arrow::array::StringArray;
|
||||
pub type MutableStringArray = arrow::array::StringBuilder;
|
||||
|
||||
@@ -47,7 +47,7 @@ impl DataType for BinaryType {
|
||||
}
|
||||
|
||||
fn as_arrow_type(&self) -> ArrowDataType {
|
||||
ArrowDataType::LargeBinary
|
||||
ArrowDataType::Binary
|
||||
}
|
||||
|
||||
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
|
||||
|
||||
@@ -342,7 +342,7 @@ impl Value {
|
||||
Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
|
||||
Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
|
||||
Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
|
||||
Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())),
|
||||
Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
|
||||
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
|
||||
Value::DateTime(v) => ScalarValue::Date64(Some(v.val())),
|
||||
Value::Null => to_null_scalar_value(output_type)?,
|
||||
@@ -413,7 +413,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
|
||||
ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None),
|
||||
ConcreteDataType::Float32(_) => ScalarValue::Float32(None),
|
||||
ConcreteDataType::Float64(_) => ScalarValue::Float64(None),
|
||||
ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None),
|
||||
ConcreteDataType::Binary(_) => ScalarValue::Binary(None),
|
||||
ConcreteDataType::String(_) => ScalarValue::Utf8(None),
|
||||
ConcreteDataType::Date(_) => ScalarValue::Date32(None),
|
||||
ConcreteDataType::DateTime(_) => ScalarValue::Date64(None),
|
||||
@@ -2105,7 +2105,7 @@ mod tests {
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())),
|
||||
ScalarValue::Binary(Some("world".as_bytes().to_vec())),
|
||||
Value::Binary(Bytes::from("world".as_bytes()))
|
||||
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
|
||||
.unwrap()
|
||||
@@ -2187,7 +2187,7 @@ mod tests {
|
||||
.unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
ScalarValue::LargeBinary(None),
|
||||
ScalarValue::Binary(None),
|
||||
Value::Null
|
||||
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
|
||||
.unwrap()
|
||||
|
||||
@@ -52,6 +52,14 @@ impl From<Vec<Option<Vec<u8>>>> for BinaryVector {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<&[u8]>> for BinaryVector {
|
||||
fn from(data: Vec<&[u8]>) -> Self {
|
||||
Self {
|
||||
array: BinaryArray::from_iter_values(data),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Vector for BinaryVector {
|
||||
fn data_type(&self) -> ConcreteDataType {
|
||||
ConcreteDataType::binary_datatype()
|
||||
@@ -257,7 +265,7 @@ mod tests {
|
||||
|
||||
let arrow_arr = v.to_arrow_array();
|
||||
assert_eq!(2, arrow_arr.len());
|
||||
assert_eq!(&ArrowDataType::LargeBinary, arrow_arr.data_type());
|
||||
assert_eq!(&ArrowDataType::Binary, arrow_arr.data_type());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -258,9 +258,9 @@ impl Helper {
|
||||
Ok(match array.as_ref().data_type() {
|
||||
ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?),
|
||||
ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?),
|
||||
ArrowDataType::LargeBinary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
|
||||
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Binary => {
|
||||
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::LargeBinary)
|
||||
ArrowDataType::Binary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
|
||||
ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
|
||||
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Binary)
|
||||
.context(crate::error::ArrowComputeSnafu)?;
|
||||
Arc::new(BinaryVector::try_from_arrow_array(array)?)
|
||||
}
|
||||
@@ -278,7 +278,7 @@ impl Helper {
|
||||
ArrowDataType::LargeUtf8 => {
|
||||
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
|
||||
.context(crate::error::ArrowComputeSnafu)?;
|
||||
Arc::new(BinaryVector::try_from_arrow_array(array)?)
|
||||
Arc::new(StringVector::try_from_arrow_array(array)?)
|
||||
}
|
||||
ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?),
|
||||
ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
|
||||
@@ -402,8 +402,10 @@ mod tests {
|
||||
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
|
||||
};
|
||||
use arrow::buffer::Buffer;
|
||||
use arrow::datatypes::Int32Type;
|
||||
use arrow_array::DictionaryArray;
|
||||
use arrow_array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeStringArray};
|
||||
use arrow_schema::DataType;
|
||||
use common_decimal::Decimal128;
|
||||
use common_time::time::Time;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
@@ -576,10 +578,6 @@ mod tests {
|
||||
fn test_try_into_vector() {
|
||||
check_try_into_vector(NullArray::new(2));
|
||||
check_try_into_vector(BooleanArray::from(vec![true, false]));
|
||||
check_try_into_vector(LargeBinaryArray::from(vec![
|
||||
"hello".as_bytes(),
|
||||
"world".as_bytes(),
|
||||
]));
|
||||
check_try_into_vector(Int8Array::from(vec![1, 2, 3]));
|
||||
check_try_into_vector(Int16Array::from(vec![1, 2, 3]));
|
||||
check_try_into_vector(Int32Array::from(vec![1, 2, 3]));
|
||||
@@ -611,6 +609,52 @@ mod tests {
|
||||
Helper::try_into_vector(array).unwrap_err();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_binary_array_into_vector() {
|
||||
let input_vec: Vec<&[u8]> = vec!["hello".as_bytes(), "world".as_bytes()];
|
||||
let assertion_vector = BinaryVector::from(input_vec.clone());
|
||||
|
||||
let input_arrays: Vec<ArrayRef> = vec![
|
||||
Arc::new(LargeBinaryArray::from(input_vec.clone())) as ArrayRef,
|
||||
Arc::new(BinaryArray::from(input_vec.clone())) as ArrayRef,
|
||||
Arc::new(FixedSizeBinaryArray::new(
|
||||
5,
|
||||
Buffer::from_vec("helloworld".as_bytes().to_vec()),
|
||||
None,
|
||||
)) as ArrayRef,
|
||||
];
|
||||
|
||||
for input_array in input_arrays {
|
||||
let vector = Helper::try_into_vector(input_array).unwrap();
|
||||
|
||||
assert_eq!(2, vector.len());
|
||||
assert_eq!(0, vector.null_count());
|
||||
|
||||
let output_arrow_array: ArrayRef = vector.to_arrow_array();
|
||||
assert_eq!(&DataType::Binary, output_arrow_array.data_type());
|
||||
assert_eq!(&assertion_vector.to_arrow_array(), &output_arrow_array);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_large_string_array_into_vector() {
|
||||
let input_vec = vec!["a", "b"];
|
||||
let assertion_array = StringArray::from(input_vec.clone());
|
||||
|
||||
let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec));
|
||||
let vector = Helper::try_into_vector(large_string_array).unwrap();
|
||||
assert_eq!(2, vector.len());
|
||||
assert_eq!(0, vector.null_count());
|
||||
|
||||
let output_arrow_array: StringArray = vector
|
||||
.to_arrow_array()
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap()
|
||||
.clone();
|
||||
assert_eq!(&assertion_array, &output_arrow_array);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_scalar_time_value() {
|
||||
let vector = Helper::try_from_scalar_value(ScalarValue::Time32Second(Some(42)), 3).unwrap();
|
||||
|
||||
@@ -281,10 +281,6 @@ impl Drop for KeyDict {
|
||||
|
||||
/// Buffer to store unsorted primary keys.
|
||||
struct KeyBuffer {
|
||||
// We use arrow's binary builder as out default binary builder
|
||||
// is LargeBinaryBuilder
|
||||
// TODO(yingwen): Change the type binary vector to Binary instead of LargeBinary.
|
||||
/// Builder for binary key array.
|
||||
key_builder: BinaryBuilder,
|
||||
next_pk_index: usize,
|
||||
}
|
||||
|
||||
@@ -79,19 +79,28 @@ pub struct SstInfo {
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_datasource::file_format::parquet::BufferedWriter;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::{BinaryExpr, Expr, Operator};
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::RecordBatch;
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema};
|
||||
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||
use parquet::file::metadata::KeyValue;
|
||||
use parquet::file::properties::WriterProperties;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use super::*;
|
||||
use crate::cache::{CacheManager, PageKey};
|
||||
use crate::sst::index::Indexer;
|
||||
use crate::sst::parquet::format::WriteFormat;
|
||||
use crate::sst::parquet::reader::ParquetReaderBuilder;
|
||||
use crate::sst::parquet::writer::ParquetWriter;
|
||||
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
|
||||
use crate::test_util::sst_util::{
|
||||
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
|
||||
sst_region_metadata,
|
||||
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
|
||||
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
|
||||
};
|
||||
use crate::test_util::{check_reader_result, TestEnv};
|
||||
|
||||
@@ -399,4 +408,92 @@ mod tests {
|
||||
let mut reader = builder.build().await.unwrap();
|
||||
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_large_binary() {
|
||||
let mut env = TestEnv::new();
|
||||
let object_store = env.init_object_store_manager();
|
||||
let handle = sst_file_handle(0, 1000);
|
||||
let file_path = handle.file_path(FILE_DIR);
|
||||
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size: 50,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let metadata = build_test_binary_test_region_metadata();
|
||||
let json = metadata.to_json().unwrap();
|
||||
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
|
||||
|
||||
let props_builder = WriterProperties::builder()
|
||||
.set_key_value_metadata(Some(vec![key_value_meta]))
|
||||
.set_compression(Compression::ZSTD(ZstdLevel::default()))
|
||||
.set_encoding(Encoding::PLAIN)
|
||||
.set_max_row_group_size(write_opts.row_group_size);
|
||||
|
||||
let writer_props = props_builder.build();
|
||||
|
||||
let write_format = WriteFormat::new(metadata);
|
||||
let fields: Vec<_> = write_format
|
||||
.arrow_schema()
|
||||
.fields()
|
||||
.into_iter()
|
||||
.map(|field| {
|
||||
let data_type = field.data_type().clone();
|
||||
if data_type == DataType::Binary {
|
||||
Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
|
||||
} else {
|
||||
Field::new(field.name(), data_type, field.is_nullable())
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let arrow_schema = Arc::new(Schema::new(fields));
|
||||
|
||||
// Ensures field_0 has LargeBinary type.
|
||||
assert_eq!(
|
||||
&DataType::LargeBinary,
|
||||
arrow_schema.field_with_name("field_0").unwrap().data_type()
|
||||
);
|
||||
let mut buffered_writer = BufferedWriter::try_new(
|
||||
file_path.clone(),
|
||||
object_store.clone(),
|
||||
arrow_schema.clone(),
|
||||
Some(writer_props),
|
||||
write_opts.write_buffer_size.as_bytes() as usize,
|
||||
DEFAULT_WRITE_CONCURRENCY,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = new_batch_with_binary(&["a"], 0, 60);
|
||||
let arrow_batch = write_format.convert_batch(&batch).unwrap();
|
||||
let arrays: Vec<_> = arrow_batch
|
||||
.columns()
|
||||
.iter()
|
||||
.map(|array| {
|
||||
let data_type = array.data_type().clone();
|
||||
if data_type == DataType::Binary {
|
||||
arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
|
||||
} else {
|
||||
array.clone()
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
|
||||
|
||||
buffered_writer.write(&result).await.unwrap();
|
||||
buffered_writer.close().await.unwrap();
|
||||
|
||||
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
|
||||
let mut reader = builder.build().await.unwrap();
|
||||
check_reader_result(
|
||||
&mut reader,
|
||||
&[
|
||||
new_batch_with_binary(&["a"], 0, 50),
|
||||
new_batch_with_binary(&["a"], 50, 60),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,14 +18,17 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::{OpType, SemanticType};
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::ValueRef;
|
||||
use parquet::file::metadata::ParquetMetaData;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::metadata::{
|
||||
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
|
||||
};
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::read::{Batch, Source};
|
||||
use crate::read::{Batch, BatchBuilder, Source};
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
use crate::sst::file::{FileHandle, FileId, FileMeta};
|
||||
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};
|
||||
@@ -128,6 +131,36 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn new_batch_with_binary(tags: &[&str], start: usize, end: usize) -> Batch {
|
||||
assert!(end >= start);
|
||||
let pk = new_primary_key(tags);
|
||||
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
|
||||
let sequences = vec![1000; end - start];
|
||||
let op_types = vec![OpType::Put; end - start];
|
||||
|
||||
let field: Vec<_> = (start..end)
|
||||
.map(|_v| "some data".as_bytes().to_vec())
|
||||
.collect();
|
||||
|
||||
let mut builder = BatchBuilder::new(pk);
|
||||
builder
|
||||
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
|
||||
timestamps.iter().copied(),
|
||||
)))
|
||||
.unwrap()
|
||||
.sequences_array(Arc::new(UInt64Array::from_iter_values(
|
||||
sequences.iter().copied(),
|
||||
)))
|
||||
.unwrap()
|
||||
.op_types_array(Arc::new(UInt8Array::from_iter_values(
|
||||
op_types.iter().map(|v| *v as u8),
|
||||
)))
|
||||
.unwrap()
|
||||
.push_field_array(1, Arc::new(BinaryArray::from_iter_values(field)))
|
||||
.unwrap();
|
||||
builder.build().unwrap()
|
||||
}
|
||||
|
||||
/// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually
|
||||
pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaData>) {
|
||||
macro_rules! assert_metadata {
|
||||
@@ -151,3 +184,36 @@ pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaDat
|
||||
|
||||
assert_metadata!(a, b, row_groups, column_index, offset_index,);
|
||||
}
|
||||
|
||||
/// Creates a new region metadata for testing SSTs with binary datatype.
|
||||
///
|
||||
/// Schema: tag_0(string), field_0(binary), ts
|
||||
pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_0".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 0,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("field_0", ConcreteDataType::binary_datatype(), true),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 2,
|
||||
})
|
||||
.primary_key(vec![0]);
|
||||
Arc::new(builder.build().unwrap())
|
||||
}
|
||||
|
||||
@@ -159,7 +159,7 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result<ScalarV
|
||||
ConcreteDataType::String(_) => Ok(ScalarValue::Utf8(Some(
|
||||
String::from_utf8_lossy(b).to_string(),
|
||||
))),
|
||||
ConcreteDataType::Binary(_) => Ok(ScalarValue::LargeBinary(Some(b.to_vec()))),
|
||||
ConcreteDataType::Binary(_) => Ok(ScalarValue::Binary(Some(b.to_vec()))),
|
||||
|
||||
_ => error::PreparedStmtTypeMismatchSnafu {
|
||||
expected: t,
|
||||
|
||||
Reference in New Issue
Block a user