diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 878932fbb4..7236c323c5 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -551,10 +551,10 @@ async fn test_region_usage() { let region_stat = region.region_usage().await; assert_eq!(region_stat.wal_usage, 0); - assert_eq!(region_stat.sst_usage, 3005); + assert_eq!(region_stat.sst_usage, 2962); // region total usage - assert_eq!(region_stat.disk_usage(), 4071); + assert_eq!(region_stat.disk_usage(), 4028); } #[tokio::test] diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 7d230981a9..468c1f8ed9 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -556,6 +556,9 @@ pub enum Error { second: Box, location: Location, }, + + #[snafu(display("Encode null value"))] + IndexEncodeNull { location: Location }, } pub type Result = std::result::Result; @@ -595,7 +598,8 @@ impl ErrorExt for Error { | InvalidParquet { .. } | OperateAbortedIndex { .. } | PuffinBlobTypeNotFound { .. } - | UnexpectedReplay { .. } => StatusCode::Unexpected, + | UnexpectedReplay { .. } + | IndexEncodeNull { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } | InvalidScanIndex { .. } diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 29a8c8a63b..a83d237457 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -50,7 +50,7 @@ pub trait RowCodec { #[derive(Debug, Clone, PartialEq, Eq)] pub struct SortField { - data_type: ConcreteDataType, + pub(crate) data_type: ConcreteDataType, } impl SortField { diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 070842544f..104085a7c5 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -190,7 +190,7 @@ impl<'a> SstIndexApplierBuilder<'a> { let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; let mut bytes = vec![]; let field = SortField::new(data_type); - IndexValueCodec::encode_value(value.as_value_ref(), &field, &mut bytes)?; + IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?; Ok(bytes) } } @@ -285,7 +285,7 @@ mod tests { pub(crate) fn encoded_string(s: impl Into) -> Vec { let mut bytes = vec![]; - IndexValueCodec::encode_value( + IndexValueCodec::encode_nonnull_value( Value::from(s.into()).as_value_ref(), &SortField::new(ConcreteDataType::string_datatype()), &mut bytes, @@ -296,7 +296,7 @@ mod tests { pub(crate) fn encoded_int64(s: impl Into) -> Vec { let mut bytes = vec![]; - IndexValueCodec::encode_value( + IndexValueCodec::encode_nonnull_value( Value::from(s.into()).as_value_ref(), &SortField::new(ConcreteDataType::int64_datatype()), &mut bytes, diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs index 855cad82ac..0e238e9914 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito2/src/sst/index/codec.rs @@ -12,28 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. +use datatypes::data_type::ConcreteDataType; use datatypes::value::{Value, ValueRef}; use memcomparable::Serializer; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; -use crate::error::Result; +use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Encodes index values according to their data types for sorting and storage use. pub struct IndexValueCodec; impl IndexValueCodec { - /// Serializes a `ValueRef` using the data type defined in `SortField` and writes + /// Serializes a non-null `ValueRef` using the data type defined in `SortField` and writes /// the result into a buffer. /// + /// For `String` data types, we don't serialize it via memcomparable, but directly write the + /// bytes into the buffer, since we have to keep the original string for searching with regex. + /// /// # Arguments /// * `value` - The value to be encoded. /// * `field` - Contains data type to guide serialization. /// * `buffer` - Destination buffer for the serialized value. - pub fn encode_value(value: ValueRef, field: &SortField, buffer: &mut Vec) -> Result<()> { - buffer.reserve(field.estimated_size()); - let mut serializer = Serializer::new(buffer); - field.serialize(&mut serializer, &value) + pub fn encode_nonnull_value( + value: ValueRef, + field: &SortField, + buffer: &mut Vec, + ) -> Result<()> { + ensure!(!value.is_null(), IndexEncodeNullSnafu); + + if matches!(field.data_type, ConcreteDataType::String(_)) { + let value = value + .as_string() + .context(FieldTypeMismatchSnafu)? + .context(IndexEncodeNullSnafu)?; + buffer.extend_from_slice(value.as_bytes()); + Ok(()) + } else { + buffer.reserve(field.estimated_size()); + let mut serializer = Serializer::new(buffer); + field.serialize(&mut serializer, &value) + } } } @@ -106,7 +126,7 @@ mod tests { let field = SortField::new(ConcreteDataType::string_datatype()); let mut buffer = Vec::new(); - IndexValueCodec::encode_value(value, &field, &mut buffer).unwrap(); + IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer).unwrap(); assert!(!buffer.is_empty()); } @@ -116,10 +136,20 @@ mod tests { let field = SortField::new(ConcreteDataType::int64_datatype()); let mut buffer = Vec::new(); - let res = IndexValueCodec::encode_value(value, &field, &mut buffer); + let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer); assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); } + #[test] + fn test_encode_null_value() { + let value = ValueRef::Null; + let field = SortField::new(ConcreteDataType::string_datatype()); + + let mut buffer = Vec::new(); + let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer); + assert!(matches!(res, Err(Error::IndexEncodeNull { .. }))); + } + #[test] fn test_decode_primary_key_basic() { let tag_columns = vec![ diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index 6b85df9541..ab8de9d1d9 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -206,7 +206,11 @@ impl SstIndexCreator { if let Some(value) = value.as_ref() { self.value_buf.clear(); - IndexValueCodec::encode_value(value.as_value_ref(), field, &mut self.value_buf)?; + IndexValueCodec::encode_nonnull_value( + value.as_value_ref(), + field, + &mut self.value_buf, + )?; } // non-null value -> Some(encoded_bytes), null value -> None @@ -294,7 +298,315 @@ impl SstIndexCreator { #[cfg(test)] mod tests { - // TODO(zhongzc): This PR has grown quite large, and the SstIndexCreator deserves - // a significant number of unit tests. These unit tests are substantial enough to - // make up a large PR on their own. I will bring them in with the next PR. + use std::collections::BTreeSet; + use std::iter; + + use api::v1::SemanticType; + use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator}; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use datatypes::value::ValueRef; + use datatypes::vectors::{UInt64Vector, UInt8Vector}; + use futures::future::BoxFuture; + use object_store::services::Memory; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + use crate::sst::index::applier::builder::SstIndexApplierBuilder; + use crate::sst::location; + + fn mock_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + fn mock_intm_mgr() -> IntermediateManager { + IntermediateManager::new(mock_object_store()) + } + + fn mock_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_str", + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_i32", + ConcreteDataType::int32_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1, 2]); + + Arc::new(builder.build().unwrap()) + } + + fn new_batch(num_rows: usize, str_tag: impl AsRef, i32_tag: impl Into) -> Batch { + let fields = vec![ + SortField::new(ConcreteDataType::string_datatype()), + SortField::new(ConcreteDataType::int32_datatype()), + ]; + let codec = McmpRowCodec::new(fields); + let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()]; + let primary_key = codec.encode(row.into_iter()).unwrap(); + + Batch::new( + primary_key, + Arc::new(UInt64Vector::from_iter_values( + iter::repeat(0).take(num_rows), + )), + Arc::new(UInt64Vector::from_iter_values( + iter::repeat(0).take(num_rows), + )), + Arc::new(UInt8Vector::from_iter_values( + iter::repeat(1).take(num_rows), + )), + vec![], + ) + .unwrap() + } + + async fn build_applier_factory( + tags: BTreeSet<(&'static str, i32)>, + ) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec> { + let region_dir = "region0".to_string(); + let sst_file_id = FileId::random(); + let file_path = location::index_file_path(®ion_dir, sst_file_id); + let object_store = mock_object_store(); + let region_metadata = mock_region_metadata(); + let intm_mgr = mock_intm_mgr(); + let memory_threshold = None; + let segment_row_count = 2; + + let mut creator = SstIndexCreator::new( + file_path, + sst_file_id, + ®ion_metadata, + object_store.clone(), + intm_mgr, + memory_threshold, + NonZeroUsize::new(segment_row_count).unwrap(), + ); + + for (str_tag, i32_tag) in &tags { + let batch = new_batch(segment_row_count, str_tag, *i32_tag); + creator.update(&batch).await.unwrap(); + } + + let (row_count, _) = creator.finish().await.unwrap(); + assert_eq!(row_count, tags.len() * segment_row_count); + + move |expr| { + let applier = SstIndexApplierBuilder::new( + region_dir.clone(), + object_store.clone(), + None, + ®ion_metadata, + Default::default(), + ) + .build(&[expr.into()]) + .unwrap() + .unwrap(); + Box::pin(async move { + applier + .apply(sst_file_id) + .await + .unwrap() + .matched_segment_ids + .iter_ones() + .collect() + }) + } + } + + #[tokio::test] + async fn test_create_and_query_get_key() { + let tags = BTreeSet::from_iter([ + ("aaa", 1), + ("aaa", 2), + ("aaa", 3), + ("aab", 1), + ("aab", 2), + ("aab", 3), + ("abc", 1), + ("abc", 2), + ("abc", 3), + ]); + + let applier_factory = build_applier_factory(tags).await; + + let expr = col("tag_str").eq(lit("aaa")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2]); + + let expr = col("tag_i32").eq(lit(2)); + let res = applier_factory(expr).await; + assert_eq!(res, vec![1, 4, 7]); + + let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2))); + let res = applier_factory(expr).await; + assert_eq!(res, vec![1]); + + let expr = col("tag_str") + .eq(lit("aaa")) + .or(col("tag_str").eq(lit("abc"))); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2, 6, 7, 8]); + + let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2, 6, 7, 8]); + } + + #[tokio::test] + async fn test_create_and_query_range() { + let tags = BTreeSet::from_iter([ + ("aaa", 1), + ("aaa", 2), + ("aaa", 3), + ("aab", 1), + ("aab", 2), + ("aab", 3), + ("abc", 1), + ("abc", 2), + ("abc", 3), + ]); + + let applier_factory = build_applier_factory(tags).await; + + let expr = col("tag_str").between(lit("aaa"), lit("aab")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2, 3, 4, 5]); + + let expr = col("tag_i32").between(lit(2), lit(3)); + let res = applier_factory(expr).await; + assert_eq!(res, vec![1, 2, 4, 5, 7, 8]); + + let expr = col("tag_str").between(lit("aaa"), lit("aaa")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2]); + + let expr = col("tag_i32").between(lit(2), lit(2)); + let res = applier_factory(expr).await; + assert_eq!(res, vec![1, 4, 7]); + } + + #[tokio::test] + async fn test_create_and_query_comparison() { + let tags = BTreeSet::from_iter([ + ("aaa", 1), + ("aaa", 2), + ("aaa", 3), + ("aab", 1), + ("aab", 2), + ("aab", 3), + ("abc", 1), + ("abc", 2), + ("abc", 3), + ]); + + let applier_factory = build_applier_factory(tags).await; + + let expr = col("tag_str").lt(lit("aab")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2]); + + let expr = col("tag_i32").lt(lit(2)); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 3, 6]); + + let expr = col("tag_str").gt(lit("aab")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![6, 7, 8]); + + let expr = col("tag_i32").gt(lit(2)); + let res = applier_factory(expr).await; + assert_eq!(res, vec![2, 5, 8]); + + let expr = col("tag_str").lt_eq(lit("aab")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2, 3, 4, 5]); + + let expr = col("tag_i32").lt_eq(lit(2)); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 3, 4, 6, 7]); + + let expr = col("tag_str").gt_eq(lit("aab")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![3, 4, 5, 6, 7, 8]); + + let expr = col("tag_i32").gt_eq(lit(2)); + let res = applier_factory(expr).await; + assert_eq!(res, vec![1, 2, 4, 5, 7, 8]); + + let expr = col("tag_str") + .gt(lit("aaa")) + .and(col("tag_str").lt(lit("abc"))); + let res = applier_factory(expr).await; + assert_eq!(res, vec![3, 4, 5]); + + let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3))); + let res = applier_factory(expr).await; + assert_eq!(res, vec![1, 4, 7]); + } + + #[tokio::test] + async fn test_create_and_query_regex() { + let tags = BTreeSet::from_iter([ + ("aaa", 1), + ("aaa", 2), + ("aaa", 3), + ("aab", 1), + ("aab", 2), + ("aab", 3), + ("abc", 1), + ("abc", 2), + ("abc", 3), + ]); + + let applier_factory = build_applier_factory(tags).await; + + let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]); + + let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![6, 7, 8]); + + let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![3, 4, 5]); + + let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]); + + let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d")); + let res = applier_factory(expr).await; + assert!(res.is_empty()); + + let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$")); + let res = applier_factory(expr).await; + assert_eq!(res, vec![0, 1, 2]); + } }