fix(index): encode string type to original data to enable fst regex to work (#3324)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-02-19 18:52:19 +08:00
committed by GitHub
parent 4810c91a64
commit 40f43de27d
6 changed files with 365 additions and 19 deletions

View File

@@ -551,10 +551,10 @@ async fn test_region_usage() {
let region_stat = region.region_usage().await;
assert_eq!(region_stat.wal_usage, 0);
assert_eq!(region_stat.sst_usage, 3005);
assert_eq!(region_stat.sst_usage, 2962);
// region total usage
assert_eq!(region_stat.disk_usage(), 4071);
assert_eq!(region_stat.disk_usage(), 4028);
}
#[tokio::test]

View File

@@ -556,6 +556,9 @@ pub enum Error {
second: Box<Error>,
location: Location,
},
#[snafu(display("Encode null value"))]
IndexEncodeNull { location: Location },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -595,7 +598,8 @@ impl ErrorExt for Error {
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
| UnexpectedReplay { .. }
| IndexEncodeNull { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
| InvalidScanIndex { .. }

View File

@@ -50,7 +50,7 @@ pub trait RowCodec {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
data_type: ConcreteDataType,
pub(crate) data_type: ConcreteDataType,
}
impl SortField {

View File

@@ -190,7 +190,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
let mut bytes = vec![];
let field = SortField::new(data_type);
IndexValueCodec::encode_value(value.as_value_ref(), &field, &mut bytes)?;
IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
Ok(bytes)
}
}
@@ -285,7 +285,7 @@ mod tests {
pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
let mut bytes = vec![];
IndexValueCodec::encode_value(
IndexValueCodec::encode_nonnull_value(
Value::from(s.into()).as_value_ref(),
&SortField::new(ConcreteDataType::string_datatype()),
&mut bytes,
@@ -296,7 +296,7 @@ mod tests {
pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
let mut bytes = vec![];
IndexValueCodec::encode_value(
IndexValueCodec::encode_nonnull_value(
Value::from(s.into()).as_value_ref(),
&SortField::new(ConcreteDataType::int64_datatype()),
&mut bytes,

View File

@@ -12,28 +12,48 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::data_type::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use memcomparable::Serializer;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use crate::error::Result;
use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// Encodes index values according to their data types for sorting and storage use.
pub struct IndexValueCodec;
impl IndexValueCodec {
/// Serializes a `ValueRef` using the data type defined in `SortField` and writes
/// Serializes a non-null `ValueRef` using the data type defined in `SortField` and writes
/// the result into a buffer.
///
/// For `String` data types, we don't serialize it via memcomparable, but directly write the
/// bytes into the buffer, since we have to keep the original string for searching with regex.
///
/// # Arguments
/// * `value` - The value to be encoded.
/// * `field` - Contains data type to guide serialization.
/// * `buffer` - Destination buffer for the serialized value.
pub fn encode_value(value: ValueRef, field: &SortField, buffer: &mut Vec<u8>) -> Result<()> {
buffer.reserve(field.estimated_size());
let mut serializer = Serializer::new(buffer);
field.serialize(&mut serializer, &value)
pub fn encode_nonnull_value(
value: ValueRef,
field: &SortField,
buffer: &mut Vec<u8>,
) -> Result<()> {
ensure!(!value.is_null(), IndexEncodeNullSnafu);
if matches!(field.data_type, ConcreteDataType::String(_)) {
let value = value
.as_string()
.context(FieldTypeMismatchSnafu)?
.context(IndexEncodeNullSnafu)?;
buffer.extend_from_slice(value.as_bytes());
Ok(())
} else {
buffer.reserve(field.estimated_size());
let mut serializer = Serializer::new(buffer);
field.serialize(&mut serializer, &value)
}
}
}
@@ -106,7 +126,7 @@ mod tests {
let field = SortField::new(ConcreteDataType::string_datatype());
let mut buffer = Vec::new();
IndexValueCodec::encode_value(value, &field, &mut buffer).unwrap();
IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer).unwrap();
assert!(!buffer.is_empty());
}
@@ -116,10 +136,20 @@ mod tests {
let field = SortField::new(ConcreteDataType::int64_datatype());
let mut buffer = Vec::new();
let res = IndexValueCodec::encode_value(value, &field, &mut buffer);
let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer);
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
}
#[test]
fn test_encode_null_value() {
let value = ValueRef::Null;
let field = SortField::new(ConcreteDataType::string_datatype());
let mut buffer = Vec::new();
let res = IndexValueCodec::encode_nonnull_value(value, &field, &mut buffer);
assert!(matches!(res, Err(Error::IndexEncodeNull { .. })));
}
#[test]
fn test_decode_primary_key_basic() {
let tag_columns = vec![

View File

@@ -206,7 +206,11 @@ impl SstIndexCreator {
if let Some(value) = value.as_ref() {
self.value_buf.clear();
IndexValueCodec::encode_value(value.as_value_ref(), field, &mut self.value_buf)?;
IndexValueCodec::encode_nonnull_value(
value.as_value_ref(),
field,
&mut self.value_buf,
)?;
}
// non-null value -> Some(encoded_bytes), null value -> None
@@ -294,7 +298,315 @@ impl SstIndexCreator {
#[cfg(test)]
mod tests {
// TODO(zhongzc): This PR has grown quite large, and the SstIndexCreator deserves
// a significant number of unit tests. These unit tests are substantial enough to
// make up a large PR on their own. I will bring them in with the next PR.
use std::collections::BTreeSet;
use std::iter;
use api::v1::SemanticType;
use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{UInt64Vector, UInt8Vector};
use futures::future::BoxFuture;
use object_store::services::Memory;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::location;
fn mock_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
}
fn mock_intm_mgr() -> IntermediateManager {
IntermediateManager::new(mock_object_store())
}
fn mock_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_str",
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_i32",
ConcreteDataType::int32_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![1, 2]);
Arc::new(builder.build().unwrap())
}
fn new_batch(num_rows: usize, str_tag: impl AsRef<str>, i32_tag: impl Into<i32>) -> Batch {
let fields = vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int32_datatype()),
];
let codec = McmpRowCodec::new(fields);
let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
let primary_key = codec.encode(row.into_iter()).unwrap();
Batch::new(
primary_key,
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt64Vector::from_iter_values(
iter::repeat(0).take(num_rows),
)),
Arc::new(UInt8Vector::from_iter_values(
iter::repeat(1).take(num_rows),
)),
vec![],
)
.unwrap()
}
async fn build_applier_factory(
tags: BTreeSet<(&'static str, i32)>,
) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
let region_dir = "region0".to_string();
let sst_file_id = FileId::random();
let file_path = location::index_file_path(&region_dir, sst_file_id);
let object_store = mock_object_store();
let region_metadata = mock_region_metadata();
let intm_mgr = mock_intm_mgr();
let memory_threshold = None;
let segment_row_count = 2;
let mut creator = SstIndexCreator::new(
file_path,
sst_file_id,
&region_metadata,
object_store.clone(),
intm_mgr,
memory_threshold,
NonZeroUsize::new(segment_row_count).unwrap(),
);
for (str_tag, i32_tag) in &tags {
let batch = new_batch(segment_row_count, str_tag, *i32_tag);
creator.update(&batch).await.unwrap();
}
let (row_count, _) = creator.finish().await.unwrap();
assert_eq!(row_count, tags.len() * segment_row_count);
move |expr| {
let applier = SstIndexApplierBuilder::new(
region_dir.clone(),
object_store.clone(),
None,
&region_metadata,
Default::default(),
)
.build(&[expr.into()])
.unwrap()
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id)
.await
.unwrap()
.matched_segment_ids
.iter_ones()
.collect()
})
}
}
#[tokio::test]
async fn test_create_and_query_get_key() {
let tags = BTreeSet::from_iter([
("aaa", 1),
("aaa", 2),
("aaa", 3),
("aab", 1),
("aab", 2),
("aab", 3),
("abc", 1),
("abc", 2),
("abc", 3),
]);
let applier_factory = build_applier_factory(tags).await;
let expr = col("tag_str").eq(lit("aaa"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2]);
let expr = col("tag_i32").eq(lit(2));
let res = applier_factory(expr).await;
assert_eq!(res, vec![1, 4, 7]);
let expr = col("tag_str").eq(lit("aaa")).and(col("tag_i32").eq(lit(2)));
let res = applier_factory(expr).await;
assert_eq!(res, vec![1]);
let expr = col("tag_str")
.eq(lit("aaa"))
.or(col("tag_str").eq(lit("abc")));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
let expr = col("tag_str").in_list(vec![lit("aaa"), lit("abc")], false);
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2, 6, 7, 8]);
}
#[tokio::test]
async fn test_create_and_query_range() {
let tags = BTreeSet::from_iter([
("aaa", 1),
("aaa", 2),
("aaa", 3),
("aab", 1),
("aab", 2),
("aab", 3),
("abc", 1),
("abc", 2),
("abc", 3),
]);
let applier_factory = build_applier_factory(tags).await;
let expr = col("tag_str").between(lit("aaa"), lit("aab"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
let expr = col("tag_i32").between(lit(2), lit(3));
let res = applier_factory(expr).await;
assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
let expr = col("tag_str").between(lit("aaa"), lit("aaa"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2]);
let expr = col("tag_i32").between(lit(2), lit(2));
let res = applier_factory(expr).await;
assert_eq!(res, vec![1, 4, 7]);
}
#[tokio::test]
async fn test_create_and_query_comparison() {
let tags = BTreeSet::from_iter([
("aaa", 1),
("aaa", 2),
("aaa", 3),
("aab", 1),
("aab", 2),
("aab", 3),
("abc", 1),
("abc", 2),
("abc", 3),
]);
let applier_factory = build_applier_factory(tags).await;
let expr = col("tag_str").lt(lit("aab"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2]);
let expr = col("tag_i32").lt(lit(2));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 3, 6]);
let expr = col("tag_str").gt(lit("aab"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![6, 7, 8]);
let expr = col("tag_i32").gt(lit(2));
let res = applier_factory(expr).await;
assert_eq!(res, vec![2, 5, 8]);
let expr = col("tag_str").lt_eq(lit("aab"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2, 3, 4, 5]);
let expr = col("tag_i32").lt_eq(lit(2));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 3, 4, 6, 7]);
let expr = col("tag_str").gt_eq(lit("aab"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![3, 4, 5, 6, 7, 8]);
let expr = col("tag_i32").gt_eq(lit(2));
let res = applier_factory(expr).await;
assert_eq!(res, vec![1, 2, 4, 5, 7, 8]);
let expr = col("tag_str")
.gt(lit("aaa"))
.and(col("tag_str").lt(lit("abc")));
let res = applier_factory(expr).await;
assert_eq!(res, vec![3, 4, 5]);
let expr = col("tag_i32").gt(lit(1)).and(col("tag_i32").lt(lit(3)));
let res = applier_factory(expr).await;
assert_eq!(res, vec![1, 4, 7]);
}
#[tokio::test]
async fn test_create_and_query_regex() {
let tags = BTreeSet::from_iter([
("aaa", 1),
("aaa", 2),
("aaa", 3),
("aab", 1),
("aab", 2),
("aab", 3),
("abc", 1),
("abc", 2),
("abc", 3),
]);
let applier_factory = build_applier_factory(tags).await;
let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*c"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![6, 7, 8]);
let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("a.*b$"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![3, 4, 5]);
let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\w"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("\\d"));
let res = applier_factory(expr).await;
assert!(res.is_empty());
let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit("^aaa$"));
let res = applier_factory(expr).await;
assert_eq!(res, vec![0, 1, 2]);
}
}