feat: support different types for CompatReader (#3745)

* feat: support different types for `CompatReader`

* chore: only compare whether we need: (data_type)

* fix: optimize code based on review suggestions

- add unit test `test_safe_cast_to_null` to test safely cast
- add DataType to projected_fields
- remove TODO

* fix: assert_eq fail on `projection.rs`

* style: codefmt

* style: fix the code based on review suggestions
This commit is contained in:
Kould
2024-04-24 14:27:52 +08:00
committed by GitHub
parent 20a933e395
commit 42e7403fcc
5 changed files with 235 additions and 85 deletions

View File

@@ -207,4 +207,21 @@ mod tests {
assert!(c.is_null(2));
}
}
#[test]
fn test_safe_cast_to_null() {
let string_vector = Arc::new(StringVector::from(vec![
Some("1"),
Some("hello"),
Some(&i64::MAX.to_string()),
None,
])) as VectorRef;
let to_type = ConcreteDataType::int32_datatype();
let b = string_vector.cast(&to_type).unwrap();
let c = b.as_any().downcast_ref::<Int32Vector>().unwrap();
assert_eq!(Value::Int32(1), c.get(0));
assert_eq!(Value::Null, c.get(1));
assert_eq!(Value::Null, c.get(2));
assert_eq!(Value::Null, c.get(3));
}
}

View File

@@ -382,17 +382,17 @@ impl Batch {
self.take_in_place(&indices)
}
/// Returns ids of fields in the [Batch] after applying the `projection`.
/// Returns ids and datatypes of fields in the [Batch] after applying the `projection`.
pub(crate) fn projected_fields(
metadata: &RegionMetadata,
projection: &[ColumnId],
) -> Vec<ColumnId> {
) -> Vec<(ColumnId, ConcreteDataType)> {
let projected_ids: HashSet<_> = projection.iter().copied().collect();
metadata
.field_columns()
.filter_map(|column| {
if projected_ids.contains(&column.column_id) {
Some(column.column_id)
Some((column.column_id, column.column_schema.data_type.clone()))
} else {
None
}

View File

@@ -16,6 +16,7 @@
use std::collections::HashMap;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use snafu::{ensure, OptionExt, ResultExt};
@@ -85,7 +86,7 @@ pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) ->
}
for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
if left_col.column_id != right_col.column_id {
if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
return false;
}
debug_assert_eq!(
@@ -134,8 +135,8 @@ impl CompatPrimaryKey {
/// Helper to make fields compatible.
#[derive(Debug)]
struct CompatFields {
/// Column Ids the reader actually returns.
actual_fields: Vec<ColumnId>,
/// Column Ids and DataTypes the reader actually returns.
actual_fields: Vec<(ColumnId, ConcreteDataType)>,
/// Indices to convert actual fields to expect fields.
index_or_defaults: Vec<IndexOrDefault>,
}
@@ -149,14 +150,28 @@ impl CompatFields {
.actual_fields
.iter()
.zip(batch.fields())
.all(|(id, batch_column)| *id == batch_column.column_id));
.all(|((id, _), batch_column)| *id == batch_column.column_id));
let len = batch.num_rows();
let fields = self
.index_or_defaults
.iter()
.map(|index_or_default| match index_or_default {
IndexOrDefault::Index(index) => batch.fields()[*index].clone(),
IndexOrDefault::Index { pos, cast_type } => {
let old_column = &batch.fields()[*pos];
let data = if let Some(ty) = cast_type {
// Safety: We ensure type can be converted and the new batch should be valid.
// Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
old_column.data.cast(ty).unwrap()
} else {
old_column.data.clone()
};
BatchColumn {
column_id: old_column.column_id,
data,
}
}
IndexOrDefault::DefaultValue {
column_id,
default_vector,
@@ -248,15 +263,23 @@ fn may_compat_fields(
let source_field_index: HashMap<_, _> = actual_fields
.iter()
.enumerate()
.map(|(idx, column_id)| (*column_id, idx))
.map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
.collect();
let index_or_defaults = expect_fields
.iter()
.map(|column_id| {
if let Some(index) = source_field_index.get(column_id) {
.map(|(column_id, expect_data_type)| {
if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
let mut cast_type = None;
if expect_data_type != *actual_data_type {
cast_type = Some(expect_data_type.clone())
}
// Source has this field.
Ok(IndexOrDefault::Index(*index))
Ok(IndexOrDefault::Index {
pos: *index,
cast_type,
})
} else {
// Safety: mapper must have this column.
let column = mapper.metadata().column_by_id(*column_id).unwrap();
@@ -293,7 +316,10 @@ fn may_compat_fields(
#[derive(Debug)]
enum IndexOrDefault {
/// Index of the column in source batch.
Index(usize),
Index {
pos: usize,
cast_type: Option<ConcreteDataType>,
},
/// Default value for the column.
DefaultValue {
/// Id of the column.
@@ -320,27 +346,19 @@ mod tests {
/// Creates a new [RegionMetadata].
fn new_metadata(
semantic_types: &[(ColumnId, SemanticType)],
semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
primary_key: &[ColumnId],
) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
for (id, semantic_type) in semantic_types {
for (id, semantic_type, data_type) in semantic_types {
let column_schema = match semantic_type {
SemanticType::Tag => ColumnSchema::new(
format!("tag_{id}"),
ConcreteDataType::string_datatype(),
true,
),
SemanticType::Field => ColumnSchema::new(
format!("field_{id}"),
ConcreteDataType::int64_datatype(),
true,
),
SemanticType::Timestamp => ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
SemanticType::Tag => {
ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
}
SemanticType::Field => {
ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
}
SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
};
builder.push_column_metadata(ColumnMetadata {
@@ -409,18 +427,26 @@ mod tests {
fn test_invalid_pk_len() {
let reader_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Tag),
(3, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1, 2],
);
let expect_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
);
@@ -431,20 +457,28 @@ mod tests {
fn test_different_pk() {
let reader_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Tag),
(3, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[2, 1],
);
let expect_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Tag),
(3, SemanticType::Field),
(4, SemanticType::Tag),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
(4, SemanticType::Tag, ConcreteDataType::string_datatype()),
],
&[1, 2, 4],
);
@@ -455,9 +489,13 @@ mod tests {
fn test_same_pk() {
let reader_meta = new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
);
@@ -470,9 +508,13 @@ mod tests {
fn test_same_fields() {
let reader_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
@@ -484,19 +526,27 @@ mod tests {
async fn test_compat_reader() {
let reader_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
(3, SemanticType::Tag),
(4, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(3, SemanticType::Tag, ConcreteDataType::string_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1, 3],
));
@@ -525,19 +575,27 @@ mod tests {
async fn test_compat_reader_different_order() {
let reader_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(3, SemanticType::Field),
(2, SemanticType::Field),
(4, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
@@ -561,22 +619,84 @@ mod tests {
}
#[tokio::test]
async fn test_compat_reader_projection() {
let reader_meta = Arc::new(new_metadata(
async fn test_compat_reader_different_types() {
let actual_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(2, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(0, SemanticType::Timestamp),
(1, SemanticType::Tag),
(3, SemanticType::Field),
(2, SemanticType::Field),
(4, SemanticType::Field),
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::string_datatype()),
],
&[1],
));
let mapper = ProjectionMapper::all(&expect_meta).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
new_batch(&k1, &[(2, false)], 1000, 3),
new_batch(&k2, &[(2, false)], 1000, 3),
]);
let fn_batch_cast = |batch: Batch| {
let mut new_fields = batch.fields.clone();
new_fields[0].data = new_fields[0]
.data
.cast(&ConcreteDataType::string_datatype())
.unwrap();
batch.with_fields(new_fields).unwrap()
};
let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
check_reader_result(
&mut compat_reader,
&[
fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
],
)
.await;
}
#[tokio::test]
async fn test_compat_reader_projection() {
let reader_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));
let expect_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(3, SemanticType::Field, ConcreteDataType::int64_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
));

View File

@@ -53,8 +53,8 @@ pub struct ProjectionMapper {
/// Ids of columns to project. It keeps ids in the same order as the `projection`
/// indices to build the mapper.
column_ids: Vec<ColumnId>,
/// Ids of field columns in the [Batch].
batch_fields: Vec<ColumnId>,
/// Ids and DataTypes of field columns in the [Batch].
batch_fields: Vec<(ColumnId, ConcreteDataType)>,
}
impl ProjectionMapper {
@@ -95,7 +95,7 @@ impl ProjectionMapper {
let field_id_to_index: HashMap<_, _> = batch_fields
.iter()
.enumerate()
.map(|(index, column_id)| (*column_id, index))
.map(|(index, (column_id, _))| (*column_id, index))
.collect();
// For each projected column, compute its index in batches.
let mut batch_indices = Vec::with_capacity(projection.len());
@@ -151,7 +151,7 @@ impl ProjectionMapper {
}
/// Returns ids of fields in [Batch]es the mapper expects to convert.
pub(crate) fn batch_fields(&self) -> &[ColumnId] {
pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
&self.batch_fields
}
@@ -173,7 +173,7 @@ impl ProjectionMapper {
.batch_fields
.iter()
.zip(batch.fields())
.all(|(id, batch_col)| *id == batch_col.column_id));
.all(|((id, _), batch_col)| *id == batch_col.column_id));
// Skips decoding pk if we don't need to output it.
let pk_values = if self.has_tags {
@@ -344,7 +344,13 @@ mod tests {
);
let mapper = ProjectionMapper::all(&metadata).unwrap();
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!([3, 4], mapper.batch_fields());
assert_eq!(
[
(3, ConcreteDataType::int64_datatype()),
(4, ConcreteDataType::int64_datatype())
],
mapper.batch_fields()
);
// With vector cache.
let cache = CacheManager::builder().vector_cache_size(1024).build();
@@ -378,7 +384,10 @@ mod tests {
// Columns v1, k0
let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
assert_eq!([4, 1], mapper.column_ids());
assert_eq!([4], mapper.batch_fields());
assert_eq!(
[(4, ConcreteDataType::int64_datatype())],
mapper.batch_fields()
);
let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
let record_batch = mapper.convert(&batch, None).unwrap();

View File

@@ -107,6 +107,10 @@ impl ColumnMetadata {
pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
serde_json::from_slice(bytes)
}
pub fn is_same_datatype(&self, other: &Self) -> bool {
self.column_schema.data_type == other.column_schema.data_type
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]