From 42e7403fccd78ae62e3bc40fc32608fe77958bd3 Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 24 Apr 2024 14:27:52 +0800 Subject: [PATCH] feat: support different types for `CompatReader` (#3745) * feat: support different types for `CompatReader` * chore: only compare whether we need: (data_type) * fix: optimize code based on review suggestions - add unit test `test_safe_cast_to_null` to test safely cast - add DataType to projected_fields - remove TODO * fix: assert_eq fail on `projection.rs` * style: codefmt * style: fix the code based on review suggestions --- src/datatypes/src/vectors/operations/cast.rs | 17 ++ src/mito2/src/read.rs | 6 +- src/mito2/src/read/compat.rs | 270 +++++++++++++------ src/mito2/src/read/projection.rs | 23 +- src/store-api/src/metadata.rs | 4 + 5 files changed, 235 insertions(+), 85 deletions(-) diff --git a/src/datatypes/src/vectors/operations/cast.rs b/src/datatypes/src/vectors/operations/cast.rs index 4ab154fa65..beb0aa9723 100644 --- a/src/datatypes/src/vectors/operations/cast.rs +++ b/src/datatypes/src/vectors/operations/cast.rs @@ -207,4 +207,21 @@ mod tests { assert!(c.is_null(2)); } } + + #[test] + fn test_safe_cast_to_null() { + let string_vector = Arc::new(StringVector::from(vec![ + Some("1"), + Some("hello"), + Some(&i64::MAX.to_string()), + None, + ])) as VectorRef; + let to_type = ConcreteDataType::int32_datatype(); + let b = string_vector.cast(&to_type).unwrap(); + let c = b.as_any().downcast_ref::().unwrap(); + assert_eq!(Value::Int32(1), c.get(0)); + assert_eq!(Value::Null, c.get(1)); + assert_eq!(Value::Null, c.get(2)); + assert_eq!(Value::Null, c.get(3)); + } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 9b0567ef61..a222eb886c 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -382,17 +382,17 @@ impl Batch { self.take_in_place(&indices) } - /// Returns ids of fields in the [Batch] after applying the `projection`. + /// Returns ids and datatypes of fields in the [Batch] after applying the `projection`. pub(crate) fn projected_fields( metadata: &RegionMetadata, projection: &[ColumnId], - ) -> Vec { + ) -> Vec<(ColumnId, ConcreteDataType)> { let projected_ids: HashSet<_> = projection.iter().copied().collect(); metadata .field_columns() .filter_map(|column| { if projected_ids.contains(&column.column_id) { - Some(column.column_id) + Some((column.column_id, column.column_schema.data_type.clone())) } else { None } diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 8c531655e4..c900600f33 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; +use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; use datatypes::vectors::VectorRef; use snafu::{ensure, OptionExt, ResultExt}; @@ -85,7 +86,7 @@ pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) -> } for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) { - if left_col.column_id != right_col.column_id { + if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) { return false; } debug_assert_eq!( @@ -134,8 +135,8 @@ impl CompatPrimaryKey { /// Helper to make fields compatible. #[derive(Debug)] struct CompatFields { - /// Column Ids the reader actually returns. - actual_fields: Vec, + /// Column Ids and DataTypes the reader actually returns. + actual_fields: Vec<(ColumnId, ConcreteDataType)>, /// Indices to convert actual fields to expect fields. index_or_defaults: Vec, } @@ -149,14 +150,28 @@ impl CompatFields { .actual_fields .iter() .zip(batch.fields()) - .all(|(id, batch_column)| *id == batch_column.column_id)); + .all(|((id, _), batch_column)| *id == batch_column.column_id)); let len = batch.num_rows(); let fields = self .index_or_defaults .iter() .map(|index_or_default| match index_or_default { - IndexOrDefault::Index(index) => batch.fields()[*index].clone(), + IndexOrDefault::Index { pos, cast_type } => { + let old_column = &batch.fields()[*pos]; + + let data = if let Some(ty) = cast_type { + // Safety: We ensure type can be converted and the new batch should be valid. + // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted. + old_column.data.cast(ty).unwrap() + } else { + old_column.data.clone() + }; + BatchColumn { + column_id: old_column.column_id, + data, + } + } IndexOrDefault::DefaultValue { column_id, default_vector, @@ -248,15 +263,23 @@ fn may_compat_fields( let source_field_index: HashMap<_, _> = actual_fields .iter() .enumerate() - .map(|(idx, column_id)| (*column_id, idx)) + .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type))) .collect(); let index_or_defaults = expect_fields .iter() - .map(|column_id| { - if let Some(index) = source_field_index.get(column_id) { + .map(|(column_id, expect_data_type)| { + if let Some((index, actual_data_type)) = source_field_index.get(column_id) { + let mut cast_type = None; + + if expect_data_type != *actual_data_type { + cast_type = Some(expect_data_type.clone()) + } // Source has this field. - Ok(IndexOrDefault::Index(*index)) + Ok(IndexOrDefault::Index { + pos: *index, + cast_type, + }) } else { // Safety: mapper must have this column. let column = mapper.metadata().column_by_id(*column_id).unwrap(); @@ -293,7 +316,10 @@ fn may_compat_fields( #[derive(Debug)] enum IndexOrDefault { /// Index of the column in source batch. - Index(usize), + Index { + pos: usize, + cast_type: Option, + }, /// Default value for the column. DefaultValue { /// Id of the column. @@ -320,27 +346,19 @@ mod tests { /// Creates a new [RegionMetadata]. fn new_metadata( - semantic_types: &[(ColumnId, SemanticType)], + semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)], primary_key: &[ColumnId], ) -> RegionMetadata { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); - for (id, semantic_type) in semantic_types { + for (id, semantic_type, data_type) in semantic_types { let column_schema = match semantic_type { - SemanticType::Tag => ColumnSchema::new( - format!("tag_{id}"), - ConcreteDataType::string_datatype(), - true, - ), - SemanticType::Field => ColumnSchema::new( - format!("field_{id}"), - ConcreteDataType::int64_datatype(), - true, - ), - SemanticType::Timestamp => ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), + SemanticType::Tag => { + ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true) + } + SemanticType::Field => { + ColumnSchema::new(format!("field_{id}"), data_type.clone(), true) + } + SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false), }; builder.push_column_metadata(ColumnMetadata { @@ -409,18 +427,26 @@ mod tests { fn test_invalid_pk_len() { let reader_meta = new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Tag), - (3, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Tag, ConcreteDataType::string_datatype()), + (3, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1, 2], ); let expect_meta = new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1], ); @@ -431,20 +457,28 @@ mod tests { fn test_different_pk() { let reader_meta = new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Tag), - (3, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Tag, ConcreteDataType::string_datatype()), + (3, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[2, 1], ); let expect_meta = new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Tag), - (3, SemanticType::Field), - (4, SemanticType::Tag), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Tag, ConcreteDataType::string_datatype()), + (3, SemanticType::Field, ConcreteDataType::int64_datatype()), + (4, SemanticType::Tag, ConcreteDataType::string_datatype()), ], &[1, 2, 4], ); @@ -455,9 +489,13 @@ mod tests { fn test_same_pk() { let reader_meta = new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1], ); @@ -470,9 +508,13 @@ mod tests { fn test_same_fields() { let reader_meta = Arc::new(new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1], )); @@ -484,19 +526,27 @@ mod tests { async fn test_compat_reader() { let reader_meta = Arc::new(new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1], )); let expect_meta = Arc::new(new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Field), - (3, SemanticType::Tag), - (4, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + (3, SemanticType::Tag, ConcreteDataType::string_datatype()), + (4, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1, 3], )); @@ -525,19 +575,27 @@ mod tests { async fn test_compat_reader_different_order() { let reader_meta = Arc::new(new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1], )); let expect_meta = Arc::new(new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (3, SemanticType::Field), - (2, SemanticType::Field), - (4, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (3, SemanticType::Field, ConcreteDataType::int64_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + (4, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1], )); @@ -561,22 +619,84 @@ mod tests { } #[tokio::test] - async fn test_compat_reader_projection() { - let reader_meta = Arc::new(new_metadata( + async fn test_compat_reader_different_types() { + let actual_meta = Arc::new(new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (2, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1], )); let expect_meta = Arc::new(new_metadata( &[ - (0, SemanticType::Timestamp), - (1, SemanticType::Tag), - (3, SemanticType::Field), - (2, SemanticType::Field), - (4, SemanticType::Field), + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::string_datatype()), + ], + &[1], + )); + let mapper = ProjectionMapper::all(&expect_meta).unwrap(); + let k1 = encode_key(&[Some("a")]); + let k2 = encode_key(&[Some("b")]); + let source_reader = VecBatchReader::new(&[ + new_batch(&k1, &[(2, false)], 1000, 3), + new_batch(&k2, &[(2, false)], 1000, 3), + ]); + + let fn_batch_cast = |batch: Batch| { + let mut new_fields = batch.fields.clone(); + new_fields[0].data = new_fields[0] + .data + .cast(&ConcreteDataType::string_datatype()) + .unwrap(); + + batch.with_fields(new_fields).unwrap() + }; + let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap(); + check_reader_result( + &mut compat_reader, + &[ + fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)), + fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)), + ], + ) + .await; + } + + #[tokio::test] + async fn test_compat_reader_projection() { + let reader_meta = Arc::new(new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + ], + &[1], + )); + let expect_meta = Arc::new(new_metadata( + &[ + ( + 0, + SemanticType::Timestamp, + ConcreteDataType::timestamp_millisecond_datatype(), + ), + (1, SemanticType::Tag, ConcreteDataType::string_datatype()), + (3, SemanticType::Field, ConcreteDataType::int64_datatype()), + (2, SemanticType::Field, ConcreteDataType::int64_datatype()), + (4, SemanticType::Field, ConcreteDataType::int64_datatype()), ], &[1], )); diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index c849a2582a..a6fab72671 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -53,8 +53,8 @@ pub struct ProjectionMapper { /// Ids of columns to project. It keeps ids in the same order as the `projection` /// indices to build the mapper. column_ids: Vec, - /// Ids of field columns in the [Batch]. - batch_fields: Vec, + /// Ids and DataTypes of field columns in the [Batch]. + batch_fields: Vec<(ColumnId, ConcreteDataType)>, } impl ProjectionMapper { @@ -95,7 +95,7 @@ impl ProjectionMapper { let field_id_to_index: HashMap<_, _> = batch_fields .iter() .enumerate() - .map(|(index, column_id)| (*column_id, index)) + .map(|(index, (column_id, _))| (*column_id, index)) .collect(); // For each projected column, compute its index in batches. let mut batch_indices = Vec::with_capacity(projection.len()); @@ -151,7 +151,7 @@ impl ProjectionMapper { } /// Returns ids of fields in [Batch]es the mapper expects to convert. - pub(crate) fn batch_fields(&self) -> &[ColumnId] { + pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] { &self.batch_fields } @@ -173,7 +173,7 @@ impl ProjectionMapper { .batch_fields .iter() .zip(batch.fields()) - .all(|(id, batch_col)| *id == batch_col.column_id)); + .all(|((id, _), batch_col)| *id == batch_col.column_id)); // Skips decoding pk if we don't need to output it. let pk_values = if self.has_tags { @@ -344,7 +344,13 @@ mod tests { ); let mapper = ProjectionMapper::all(&metadata).unwrap(); assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); - assert_eq!([3, 4], mapper.batch_fields()); + assert_eq!( + [ + (3, ConcreteDataType::int64_datatype()), + (4, ConcreteDataType::int64_datatype()) + ], + mapper.batch_fields() + ); // With vector cache. let cache = CacheManager::builder().vector_cache_size(1024).build(); @@ -378,7 +384,10 @@ mod tests { // Columns v1, k0 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap(); assert_eq!([4, 1], mapper.column_ids()); - assert_eq!([4], mapper.batch_fields()); + assert_eq!( + [(4, ConcreteDataType::int64_datatype())], + mapper.batch_fields() + ); let batch = new_batch(0, &[1, 2], &[(4, 4)], 3); let record_batch = mapper.convert(&batch, None).unwrap(); diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 5874f23da4..2df2958098 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -107,6 +107,10 @@ impl ColumnMetadata { pub fn decode_list(bytes: &[u8]) -> serde_json::Result> { serde_json::from_slice(bytes) } + + pub fn is_same_datatype(&self, other: &Self) -> bool { + self.column_schema.data_type == other.column_schema.data_type + } } #[cfg_attr(doc, aquamarine::aquamarine)]