feat: replace DensePrimaryKeyCodec with Arc<dyn PrimaryKeyCodec> (#5408)

* feat: use `PrimaryKeyCodec` trait object

* feat: introduce `RewritePrimaryKey`

* chore: apply suggestions from CR

* fix: fix clippy

* chore: add comments
This commit is contained in:
Weny Xu
2025-01-23 17:44:17 +09:00
committed by GitHub
parent 35b635f639
commit 05f21679d6
20 changed files with 635 additions and 208 deletions

View File

@@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::row_converter::DensePrimaryKeyCodec;
use crate::row_converter::{build_primary_key_codec, DensePrimaryKeyCodec};
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;
@@ -41,7 +41,7 @@ impl BulkIterContext {
projection: &Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Self {
let codec = DensePrimaryKeyCodec::new(&region_metadata);
let codec = build_primary_key_codec(&region_metadata);
let simple_filters = predicate
.as_ref()

View File

@@ -562,7 +562,7 @@ mod tests {
let batch_values = batches
.into_iter()
.map(|b| {
let pk_values = pk_encoder.decode_dense(b.primary_key()).unwrap();
let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
let timestamps = b
.timestamps()
.as_any()

View File

@@ -31,9 +31,8 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
pub(crate) use primary_key_filter::DensePrimaryKeyFilter;
pub(crate) use primary_key_filter::{DensePrimaryKeyFilter, SparsePrimaryKeyFilter};
use serde::{Deserialize, Serialize};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, SequenceNumber};
use table::predicate::Predicate;
@@ -48,7 +47,7 @@ use crate::memtable::{
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
};
use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec};
use crate::row_converter::{build_primary_key_codec, PrimaryKeyCodec};
/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
@@ -330,22 +329,14 @@ impl PartitionTreeMemtableBuilder {
impl MemtableBuilder for PartitionTreeMemtableBuilder {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
match metadata.primary_key_encoding {
PrimaryKeyEncoding::Dense => {
let codec = Arc::new(DensePrimaryKeyCodec::new(metadata));
Arc::new(PartitionTreeMemtable::new(
id,
codec,
metadata.clone(),
self.write_buffer_manager.clone(),
&self.config,
))
}
PrimaryKeyEncoding::Sparse => {
//TODO(weny): Implement sparse primary key encoding.
todo!()
}
}
let codec = build_primary_key_codec(metadata);
Arc::new(PartitionTreeMemtable::new(
id,
codec,
metadata.clone(),
self.write_buffer_manager.clone(),
&self.config,
))
}
}
@@ -382,7 +373,7 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::row_converter::DensePrimaryKeyCodec;
use crate::test_util::memtable_util::{
self, collect_iter_timestamps, region_metadata_to_row_schema,
};
@@ -794,7 +785,7 @@ mod tests {
let mut reader = new_memtable.iter(None, None, None).unwrap();
let batch = reader.next().unwrap().unwrap();
let pk = codec.decode(batch.primary_key()).unwrap();
let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
if let Value::String(s) = &pk[2] {
assert_eq!("10min", s.as_utf8());
} else {

View File

@@ -96,6 +96,21 @@ impl PartitionTree {
}
}
fn verify_primary_key_length(&self, kv: &KeyValue) -> Result<()> {
// The sparse primary key codec does not have a fixed number of fields.
if let Some(expected_num_fields) = self.row_codec.num_fields() {
ensure!(
expected_num_fields == kv.num_primary_keys(),
PrimaryKeyLengthMismatchSnafu {
expect: expected_num_fields,
actual: kv.num_primary_keys(),
}
);
}
// TODO(weny): verify the primary key length for sparse primary key codec.
Ok(())
}
// TODO(yingwen): The size computed from values is inaccurate.
/// Write key-values into the tree.
///
@@ -110,13 +125,7 @@ impl PartitionTree {
let has_pk = !self.metadata.primary_key.is_empty();
for kv in kvs.iter() {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
self.verify_primary_key_length(&kv)?;
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);
@@ -161,13 +170,7 @@ impl PartitionTree {
) -> Result<()> {
let has_pk = !self.metadata.primary_key.is_empty();
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
self.verify_primary_key_length(&kv)?;
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);

View File

@@ -51,7 +51,7 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::dedup::LastNonNullIter;
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 0;
@@ -146,12 +146,13 @@ impl TimeSeriesMemtable {
fn write_key_value(&self, kv: KeyValue, stats: &mut WriteMetrics) -> Result<()> {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
self.row_codec.num_fields() == kv.num_primary_keys(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys()
actual: kv.num_primary_keys(),
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let fields = kv.fields().collect::<Vec<_>>();
@@ -585,7 +586,7 @@ fn prune_primary_key(
let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
pk_values
} else {
let pk_values = codec.decode(pk);
let pk_values = codec.decode_dense_without_column_id(pk);
if let Err(e) = pk_values {
error!(e; "Failed to decode primary key");
return true;
@@ -1176,7 +1177,12 @@ mod tests {
let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
schema
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.map(|c| {
(
c.column_id,
SortField::new(c.column_schema.data_type.clone()),
)
})
.collect(),
));
let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));

View File

@@ -40,7 +40,7 @@ use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::row::{RowConverter, SortField};
use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
use datatypes::types::TimestampType;
use datatypes::value::{Value, ValueRef};
use datatypes::value::ValueRef;
use datatypes::vectors::{
BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
@@ -58,6 +58,7 @@ use crate::error::{
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::row_converter::CompositeValues;
/// Storage internal representation of a batch of rows for a primary key (time series).
///
@@ -68,7 +69,7 @@ pub struct Batch {
/// Primary key encoded in a comparable form.
primary_key: Vec<u8>,
/// Possibly decoded `primary_key` values. Some places would decode it in advance.
pk_values: Option<Vec<Value>>,
pk_values: Option<CompositeValues>,
/// Timestamps of rows, should be sorted and not null.
timestamps: VectorRef,
/// Sequences of rows
@@ -114,12 +115,12 @@ impl Batch {
}
/// Returns possibly decoded primary-key values.
pub fn pk_values(&self) -> Option<&[Value]> {
self.pk_values.as_deref()
pub fn pk_values(&self) -> Option<&CompositeValues> {
self.pk_values.as_ref()
}
/// Sets possibly decoded primary-key values.
pub fn set_pk_values(&mut self, pk_values: Vec<Value>) {
pub fn set_pk_values(&mut self, pk_values: CompositeValues) {
self.pk_values = Some(pk_values);
}

View File

@@ -15,6 +15,7 @@
//! Utilities to adapt readers with different schema.
use std::collections::HashMap;
use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
@@ -26,7 +27,10 @@ use store_api::storage::ColumnId;
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField};
use crate::row_converter::{
build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
SortField,
};
/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
@@ -68,6 +72,8 @@ impl<R: BatchReader> BatchReader for CompatReader<R> {
/// A helper struct to adapt schema of the batch to an expected schema.
pub(crate) struct CompatBatch {
/// Optional primary key adapter.
rewrite_pk: Option<RewritePrimaryKey>,
/// Optional primary key adapter.
compat_pk: Option<CompatPrimaryKey>,
/// Optional fields adapter.
@@ -79,10 +85,12 @@ impl CompatBatch {
/// - `mapper` is built from the metadata users expect to see.
/// - `reader_meta` is the metadata of the input reader.
pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
let compat_fields = may_compat_fields(mapper, &reader_meta)?;
Ok(Self {
rewrite_pk,
compat_pk,
compat_fields,
})
@@ -90,6 +98,9 @@ impl CompatBatch {
/// Adapts the `batch` to the expected schema.
pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
if let Some(rewrite_pk) = &self.rewrite_pk {
batch = rewrite_pk.compat(batch)?;
}
if let Some(compat_pk) = &self.compat_pk {
batch = compat_pk.compat(batch)?;
}
@@ -101,10 +112,15 @@ impl CompatBatch {
}
}
/// Returns true if `left` and `right` have same columns to read.
///
/// It only consider column ids.
pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) -> bool {
/// Returns true if `left` and `right` have same columns and primary key encoding.
pub(crate) fn has_same_columns_and_pk_encoding(
left: &RegionMetadata,
right: &RegionMetadata,
) -> bool {
if left.primary_key_encoding != right.primary_key_encoding {
return false;
}
if left.column_metadatas.len() != right.column_metadatas.len() {
return false;
}
@@ -127,16 +143,17 @@ pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) ->
#[derive(Debug)]
struct CompatPrimaryKey {
/// Row converter to append values to primary keys.
converter: DensePrimaryKeyCodec,
converter: Arc<dyn PrimaryKeyCodec>,
/// Default values to append.
values: Vec<Value>,
values: Vec<(ColumnId, Value)>,
}
impl CompatPrimaryKey {
/// Make primary key of the `batch` compatible.
fn compat(&self, mut batch: Batch) -> Result<Batch> {
let mut buffer =
Vec::with_capacity(batch.primary_key().len() + self.converter.estimated_size());
let mut buffer = Vec::with_capacity(
batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
);
buffer.extend_from_slice(batch.primary_key());
self.converter.encode_values(&self.values, &mut buffer)?;
@@ -144,9 +161,7 @@ impl CompatPrimaryKey {
// update cache
if let Some(pk_values) = &mut batch.pk_values {
for value in &self.values {
pk_values.push(value.clone());
}
pk_values.extend(&self.values);
}
Ok(batch)
@@ -211,6 +226,25 @@ impl CompatFields {
}
}
fn may_rewrite_primary_key(
expect: &RegionMetadata,
actual: &RegionMetadata,
) -> Option<RewritePrimaryKey> {
if expect.primary_key_encoding == actual.primary_key_encoding {
return None;
}
let fields = expect.primary_key.clone();
let original = build_primary_key_codec(actual);
let new = build_primary_key_codec(expect);
Some(RewritePrimaryKey {
original,
new,
fields,
})
}
/// Creates a [CompatPrimaryKey] if needed.
fn may_compat_primary_key(
expect: &RegionMetadata,
@@ -248,7 +282,10 @@ fn may_compat_primary_key(
for column_id in to_add {
// Safety: The id comes from expect region metadata.
let column = expect.column_by_id(*column_id).unwrap();
fields.push(SortField::new(column.column_schema.data_type.clone()));
fields.push((
*column_id,
SortField::new(column.column_schema.data_type.clone()),
));
let default_value = column
.column_schema
.create_default()
@@ -263,9 +300,11 @@ fn may_compat_primary_key(
column.column_schema.name
),
})?;
values.push(default_value);
values.push((*column_id, default_value));
}
let converter = DensePrimaryKeyCodec::with_fields(fields);
// Using expect primary key encoding to build the converter
let converter =
build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
Ok(Some(CompatPrimaryKey { converter, values }))
}
@@ -350,6 +389,53 @@ enum IndexOrDefault {
},
}
/// Adapter to rewrite primary key.
struct RewritePrimaryKey {
/// Original primary key codec.
original: Arc<dyn PrimaryKeyCodec>,
/// New primary key codec.
new: Arc<dyn PrimaryKeyCodec>,
/// Order of the fields in the new primary key.
fields: Vec<ColumnId>,
}
impl RewritePrimaryKey {
/// Make primary key of the `batch` compatible.
fn compat(&self, mut batch: Batch) -> Result<Batch> {
let values = if let Some(pk_values) = batch.pk_values() {
pk_values
} else {
let new_pk_values = self.original.decode(batch.primary_key())?;
batch.set_pk_values(new_pk_values);
// Safety: We ensure pk_values is not None.
batch.pk_values().as_ref().unwrap()
};
let mut buffer = Vec::with_capacity(
batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
);
match values {
CompositeValues::Dense(values) => {
self.new.encode_values(values.as_slice(), &mut buffer)?;
}
CompositeValues::Sparse(values) => {
let values = self
.fields
.iter()
.map(|id| {
let value = values.get_or_null(*id);
(*id, value.as_value_ref())
})
.collect::<Vec<_>>();
self.new.encode_value_refs(&values, &mut buffer)?;
}
}
batch.set_primary_key(buffer);
Ok(batch)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -359,11 +445,12 @@ mod tests {
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::PrimaryKeyCodecExt;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec};
use crate::test_util::{check_reader_result, VecBatchReader};
/// Creates a new [RegionMetadata].
@@ -396,7 +483,7 @@ mod tests {
/// Encode primary key.
fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
let fields = (0..keys.len())
.map(|_| SortField::new(ConcreteDataType::string_datatype()))
.map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
.collect();
let converter = DensePrimaryKeyCodec::with_fields(fields);
let row = keys.iter().map(|str_opt| match str_opt {
@@ -407,6 +494,24 @@ mod tests {
converter.encode(row).unwrap()
}
/// Encode sparse primary key.
fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
let fields = (0..keys.len())
.map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
.collect();
let converter = SparsePrimaryKeyCodec::with_fields(fields);
let row = keys
.iter()
.map(|(id, str_opt)| match str_opt {
Some(v) => (*id, ValueRef::String(v)),
None => (*id, ValueRef::Null),
})
.collect::<Vec<_>>();
let mut buffer = vec![];
converter.encode_value_refs(&row, &mut buffer).unwrap();
buffer
}
/// Creates a batch for specific primary `key`.
///
/// `fields`: [(column_id of the field, is null)]
@@ -526,6 +631,25 @@ mod tests {
.is_none());
}
#[test]
fn test_same_pk_encoding() {
let reader_meta = Arc::new(new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
],
&[1],
));
assert!(may_compat_primary_key(&reader_meta, &reader_meta)
.unwrap()
.is_none());
}
#[test]
fn test_same_fields() {
let reader_meta = Arc::new(new_metadata(
@@ -747,4 +871,58 @@ mod tests {
)
.await;
}
#[tokio::test]
async fn test_compat_reader_different_pk_encoding() {
let mut reader_meta = new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1],
);
reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
let reader_meta = Arc::new(reader_meta);
let mut expect_meta = new_metadata(
&[
(
0,
SemanticType::Timestamp,
ConcreteDataType::timestamp_millisecond_datatype(),
),
(1, SemanticType::Tag, ConcreteDataType::string_datatype()),
(2, SemanticType::Field, ConcreteDataType::int64_datatype()),
(3, SemanticType::Tag, ConcreteDataType::string_datatype()),
(4, SemanticType::Field, ConcreteDataType::int64_datatype()),
],
&[1, 3],
);
expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
let expect_meta = Arc::new(expect_meta);
let mapper = ProjectionMapper::all(&expect_meta).unwrap();
let k1 = encode_key(&[Some("a")]);
let k2 = encode_key(&[Some("b")]);
let source_reader = VecBatchReader::new(&[
new_batch(&k1, &[(2, false)], 1000, 3),
new_batch(&k2, &[(2, false)], 1000, 3),
]);
let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
check_reader_result(
&mut compat_reader,
&[
new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
],
)
.await;
}
}

View File

@@ -33,7 +33,7 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec};
use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
@@ -47,7 +47,7 @@ pub struct ProjectionMapper {
/// Output record batch contains tags.
has_tags: bool,
/// Decoder for primary key.
codec: DensePrimaryKeyCodec,
codec: Arc<dyn PrimaryKeyCodec>,
/// Schema for converted [RecordBatch].
output_schema: SchemaRef,
/// Ids of columns to project. It keeps ids in the same order as the `projection`
@@ -92,8 +92,8 @@ impl ProjectionMapper {
// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
}
let codec = DensePrimaryKeyCodec::new(metadata);
let codec = build_primary_key_codec(metadata);
if is_empty_projection {
// If projection is empty, we don't output any column.
return Ok(ProjectionMapper {
@@ -134,7 +134,7 @@ impl ProjectionMapper {
has_tags = true;
// We always read all primary key so the column always exists and the tag
// index is always valid.
BatchIndex::Tag(index)
BatchIndex::Tag((index, column.column_id))
}
SemanticType::Timestamp => BatchIndex::Timestamp,
SemanticType::Field => {
@@ -213,15 +213,15 @@ impl ProjectionMapper {
// Skips decoding pk if we don't need to output it.
let pk_values = if self.has_tags {
match batch.pk_values() {
Some(v) => v.to_vec(),
Some(v) => v.clone(),
None => self
.codec
.decode_dense(batch.primary_key())
.decode(batch.primary_key())
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
}
} else {
Vec::new()
CompositeValues::Dense(vec![])
};
let mut columns = Vec::with_capacity(self.output_schema.num_columns());
@@ -232,8 +232,11 @@ impl ProjectionMapper {
.zip(self.output_schema.column_schemas())
{
match index {
BatchIndex::Tag(idx) => {
let value = &pk_values[*idx];
BatchIndex::Tag((idx, column_id)) => {
let value = match &pk_values {
CompositeValues::Dense(v) => &v[*idx].1,
CompositeValues::Sparse(v) => v.get_or_null(*column_id),
};
let vector = repeated_vector_with_cache(
&column_schema.data_type,
value,
@@ -259,7 +262,7 @@ impl ProjectionMapper {
#[derive(Debug, Clone, Copy)]
enum BatchIndex {
/// Index in primary keys.
Tag(usize),
Tag((usize, ColumnId)),
/// The time index column.
Timestamp,
/// Index in fields.
@@ -321,7 +324,7 @@ mod tests {
use super::*;
use crate::cache::CacheManager;
use crate::read::BatchBuilder;
use crate::row_converter::{PrimaryKeyCodecExt, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use crate::test_util::meta_util::TestRegionMetadataBuilder;
fn new_batch(
@@ -332,7 +335,12 @@ mod tests {
) -> Batch {
let converter = DensePrimaryKeyCodec::with_fields(
(0..tags.len())
.map(|_| SortField::new(ConcreteDataType::int64_datatype()))
.map(|idx| {
(
idx as u32,
SortField::new(ConcreteDataType::int64_datatype()),
)
})
.collect(),
);
let primary_key = converter

View File

@@ -767,7 +767,7 @@ impl ScanInput {
}
}
};
if !compat::has_same_columns(
if !compat::has_same_columns_and_pk_encoding(
self.mapper.metadata(),
file_range_ctx.read_format().metadata(),
) {

View File

@@ -13,10 +13,8 @@
// limitations under the License.
mod dense;
// TODO(weny): remove it.
#[allow(unused)]
mod sparse;
use std::fmt::Debug;
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
@@ -24,7 +22,8 @@ use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
pub use sparse::{SparsePrimaryKeyCodec, SparseValues};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
@@ -49,9 +48,6 @@ pub trait PrimaryKeyCodecExt {
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>;
/// Decode row values from bytes.
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>>;
}
pub trait PrimaryKeyFilter: Send + Sync {
@@ -59,15 +55,63 @@ pub trait PrimaryKeyFilter: Send + Sync {
fn matches(&mut self, pk: &[u8]) -> bool;
}
pub trait PrimaryKeyCodec: Send + Sync {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompositeValues {
Dense(Vec<(ColumnId, Value)>),
Sparse(SparseValues),
}
impl CompositeValues {
/// Extends the composite values with the given values.
pub fn extend(&mut self, values: &[(ColumnId, Value)]) {
match self {
CompositeValues::Dense(dense_values) => {
for (column_id, value) in values {
dense_values.push((*column_id, value.clone()));
}
}
CompositeValues::Sparse(sprase_value) => {
for (column_id, value) in values {
sprase_value.insert(*column_id, value.clone());
}
}
}
}
}
#[cfg(test)]
impl CompositeValues {
pub fn into_sparse(self) -> SparseValues {
match self {
CompositeValues::Sparse(v) => v,
_ => panic!("CompositeValues is not sparse"),
}
}
pub fn into_dense(self) -> Vec<Value> {
match self {
CompositeValues::Dense(v) => v.into_iter().map(|(_, v)| v).collect(),
_ => panic!("CompositeValues is not dense"),
}
}
}
pub trait PrimaryKeyCodec: Send + Sync + Debug {
/// Encodes a key value to bytes.
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
/// Encodes values to bytes.
fn encode_values(&self, values: &[Value], buffer: &mut Vec<u8>) -> Result<()>;
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()>;
/// Encodes values to bytes.
fn encode_value_refs(
&self,
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()>;
/// Returns the number of fields in the primary key.
fn num_fields(&self) -> usize;
fn num_fields(&self) -> Option<usize>;
/// Returns a primary key filter factory.
fn primary_key_filter(
@@ -86,9 +130,33 @@ pub trait PrimaryKeyCodec: Send + Sync {
/// Decodes the primary key from the given bytes.
///
/// Returns a [`Vec<Value>`] that follows the primary key ordering.
fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<Value>>;
/// Returns a [`CompositeValues`] that follows the primary key ordering.
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues>;
/// Decode the leftmost value from bytes.
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
}
/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec(region_metadata: &RegionMetadata) -> Arc<dyn PrimaryKeyCodec> {
let fields = region_metadata.primary_key_columns().map(|col| {
(
col.column_id,
SortField::new(col.column_schema.data_type.clone()),
)
});
build_primary_key_codec_with_fields(region_metadata.primary_key_encoding, fields)
}
/// Builds a primary key codec from region metadata.
pub fn build_primary_key_codec_with_fields(
encoding: PrimaryKeyEncoding,
fields: impl Iterator<Item = (ColumnId, SortField)>,
) -> Arc<dyn PrimaryKeyCodec> {
match encoding {
PrimaryKeyEncoding::Dense => Arc::new(DensePrimaryKeyCodec::with_fields(fields.collect())),
PrimaryKeyEncoding::Sparse => {
Arc::new(SparsePrimaryKeyCodec::with_fields(fields.collect()))
}
}
}

View File

@@ -30,8 +30,9 @@ use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use super::PrimaryKeyFilter;
use super::{CompositeValues, PrimaryKeyFilter};
use crate::error::{
self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
};
@@ -312,34 +313,31 @@ impl PrimaryKeyCodecExt for DensePrimaryKeyCodec {
{
self.encode_dense(row, buffer)
}
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>> {
self.decode_dense(bytes)
}
}
/// A memory-comparable row [`Value`] encoder/decoder.
#[derive(Clone, Debug)]
pub struct DensePrimaryKeyCodec {
/// Primary key fields.
ordered_primary_key_columns: Arc<Vec<SortField>>,
ordered_primary_key_columns: Arc<Vec<(ColumnId, SortField)>>,
}
impl DensePrimaryKeyCodec {
pub fn new(metadata: &RegionMetadata) -> Self {
let ordered_primary_key_columns = Arc::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect::<Vec<_>>(),
);
let ordered_primary_key_columns = metadata
.primary_key_columns()
.map(|c| {
(
c.column_id,
SortField::new(c.column_schema.data_type.clone()),
)
})
.collect::<Vec<_>>();
Self {
ordered_primary_key_columns,
}
Self::with_fields(ordered_primary_key_columns)
}
pub fn with_fields(fields: Vec<SortField>) -> Self {
pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
Self {
ordered_primary_key_columns: Arc::new(fields),
}
@@ -350,12 +348,42 @@ impl DensePrimaryKeyCodec {
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.ordered_primary_key_columns.iter()) {
for (value, (_, field)) in row.zip(self.ordered_primary_key_columns.iter()) {
field.serialize(&mut serializer, &value)?;
}
Ok(())
}
/// Decode primary key values from bytes.
pub fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<(ColumnId, Value)>> {
let mut deserializer = Deserializer::new(bytes);
let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
for (column_id, field) in self.ordered_primary_key_columns.iter() {
let value = field.deserialize(&mut deserializer)?;
values.push((*column_id, value));
}
Ok(values)
}
/// Decode primary key values from bytes without column id.
pub fn decode_dense_without_column_id(&self, bytes: &[u8]) -> Result<Vec<Value>> {
let mut deserializer = Deserializer::new(bytes);
let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
for (_, field) in self.ordered_primary_key_columns.iter() {
let value = field.deserialize(&mut deserializer)?;
values.push(value);
}
Ok(values)
}
/// Returns the field at `pos`.
///
/// # Panics
/// Panics if `pos` is out of bounds.
fn field_at(&self, pos: usize) -> &SortField {
&self.ordered_primary_key_columns[pos].1
}
/// Decode value at `pos` in `bytes`.
///
/// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`.
@@ -370,7 +398,7 @@ impl DensePrimaryKeyCodec {
// We computed the offset before.
let to_skip = offsets_buf[pos];
deserializer.advance(to_skip);
return self.ordered_primary_key_columns[pos].deserialize(&mut deserializer);
return self.field_at(pos).deserialize(&mut deserializer);
}
if offsets_buf.is_empty() {
@@ -379,7 +407,8 @@ impl DensePrimaryKeyCodec {
for i in 0..pos {
// Offset to skip before reading value i.
offsets_buf.push(offset);
let skip = self.ordered_primary_key_columns[i]
let skip = self
.field_at(i)
.skip_deserialize(bytes, &mut deserializer)?;
offset += skip;
}
@@ -393,7 +422,8 @@ impl DensePrimaryKeyCodec {
deserializer.advance(offset);
for i in value_start..pos {
// Skip value i.
let skip = self.ordered_primary_key_columns[i]
let skip = self
.field_at(i)
.skip_deserialize(bytes, &mut deserializer)?;
// Offset for the value at i + 1.
offset += skip;
@@ -401,15 +431,19 @@ impl DensePrimaryKeyCodec {
}
}
self.ordered_primary_key_columns[pos].deserialize(&mut deserializer)
self.field_at(pos).deserialize(&mut deserializer)
}
pub fn estimated_size(&self) -> usize {
self.ordered_primary_key_columns
.iter()
.map(|f| f.estimated_size())
.map(|(_, f)| f.estimated_size())
.sum()
}
pub fn num_fields(&self) -> usize {
self.ordered_primary_key_columns.len()
}
}
impl PrimaryKeyCodec for DensePrimaryKeyCodec {
@@ -417,16 +451,25 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec {
self.encode_dense(key_value.primary_keys(), buffer)
}
fn encode_values(&self, values: &[Value], buffer: &mut Vec<u8>) -> Result<()> {
self.encode_dense(values.iter().map(|v| v.as_value_ref()), buffer)
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
self.encode_dense(values.iter().map(|(_, v)| v.as_value_ref()), buffer)
}
fn encode_value_refs(
&self,
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()> {
let iter = values.iter().map(|(_, v)| *v);
self.encode_dense(iter, buffer)
}
fn estimated_size(&self) -> Option<usize> {
Some(self.estimated_size())
}
fn num_fields(&self) -> usize {
self.ordered_primary_key_columns.len()
fn num_fields(&self) -> Option<usize> {
Some(self.num_fields())
}
fn encoding(&self) -> PrimaryKeyEncoding {
@@ -445,20 +488,14 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec {
))
}
fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<Value>> {
let mut deserializer = Deserializer::new(bytes);
let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
for f in self.ordered_primary_key_columns.iter() {
let value = f.deserialize(&mut deserializer)?;
values.push(value);
}
Ok(values)
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
Ok(CompositeValues::Dense(self.decode_dense(bytes)?))
}
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
// TODO(weny, yinwen): avoid decoding the whole primary key.
let mut values = self.decode_dense(bytes)?;
Ok(values.pop())
Ok(values.pop().map(|(_, v)| v))
}
}
@@ -476,14 +513,14 @@ mod tests {
let encoder = DensePrimaryKeyCodec::with_fields(
data_types
.iter()
.map(|t| SortField::new(t.clone()))
.map(|t| (0, SortField::new(t.clone())))
.collect::<Vec<_>>(),
);
let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
let result = encoder.encode(value_ref.iter().cloned()).unwrap();
let decoded = encoder.decode(&result).unwrap();
let decoded = encoder.decode(&result).unwrap().into_dense();
assert_eq!(decoded, row);
let mut decoded = Vec::new();
let mut offsets = Vec::new();
@@ -502,14 +539,14 @@ mod tests {
#[test]
fn test_memcmp() {
let encoder = DensePrimaryKeyCodec::with_fields(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int64_datatype()),
(0, SortField::new(ConcreteDataType::string_datatype())),
(1, SortField::new(ConcreteDataType::int64_datatype())),
]);
let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
let result = encoder.encode(value_ref.iter().cloned()).unwrap();
let decoded = encoder.decode(&result).unwrap();
let decoded = encoder.decode(&result).unwrap().into_dense();
assert_eq!(&values, &decoded as &[Value]);
}

View File

@@ -15,25 +15,30 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use memcomparable::{Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::ColumnId;
use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu};
use crate::error::{DeserializeFieldSnafu, Result, SerializeFieldSnafu, UnsupportedOperationSnafu};
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::SparsePrimaryKeyFilter;
use crate::row_converter::dense::SortField;
use crate::row_converter::PrimaryKeyCodec;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec, PrimaryKeyFilter};
/// A codec for sparse key of metrics.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct SparsePrimaryKeyCodec {
inner: Arc<SparsePrimaryKeyCodecInner>,
}
#[derive(Debug)]
struct SparsePrimaryKeyCodecInner {
// Internal fields
table_id_field: SortField,
@@ -66,6 +71,11 @@ impl SparseValues {
self.values.get(&column_id).unwrap_or(&Value::Null)
}
/// Returns the value of the given column, or [`None`] if the column is not present.
pub fn get(&self, column_id: &ColumnId) -> Option<&Value> {
self.values.get(column_id)
}
/// Inserts a new value into the [`SparseValues`].
pub fn insert(&mut self, column_id: ColumnId, value: Value) {
self.values.insert(column_id, value);
@@ -111,6 +121,17 @@ impl SparsePrimaryKeyCodec {
}
}
pub fn with_fields(fields: Vec<(ColumnId, SortField)>) -> Self {
Self {
inner: Arc::new(SparsePrimaryKeyCodecInner {
columns: Some(fields.iter().map(|f| f.0).collect()),
table_id_field: SortField::new(ConcreteDataType::uint32_datatype()),
tsid_field: SortField::new(ConcreteDataType::uint64_datatype()),
label_field: SortField::new(ConcreteDataType::string_datatype()),
}),
}
}
/// Returns the field of the given column id.
fn get_field(&self, column_id: ColumnId) -> Option<&SortField> {
// if the `columns` is not specified, all unknown columns is primary key(label field).
@@ -224,6 +245,59 @@ impl SparsePrimaryKeyCodec {
}
}
impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
fn encode_key_value(&self, _key_value: &KeyValue, _buffer: &mut Vec<u8>) -> Result<()> {
UnsupportedOperationSnafu {
err_msg: "The encode_key_value method is not supported in SparsePrimaryKeyCodec.",
}
.fail()
}
fn encode_values(&self, values: &[(ColumnId, Value)], buffer: &mut Vec<u8>) -> Result<()> {
self.encode_to_vec(values.iter().map(|v| (v.0, v.1.as_value_ref())), buffer)
}
fn encode_value_refs(
&self,
values: &[(ColumnId, ValueRef)],
buffer: &mut Vec<u8>,
) -> Result<()> {
self.encode_to_vec(values.iter().map(|v| (v.0, v.1)), buffer)
}
fn estimated_size(&self) -> Option<usize> {
None
}
fn num_fields(&self) -> Option<usize> {
None
}
fn encoding(&self) -> PrimaryKeyEncoding {
PrimaryKeyEncoding::Sparse
}
fn primary_key_filter(
&self,
metadata: &RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
) -> Box<dyn PrimaryKeyFilter> {
Box::new(SparsePrimaryKeyFilter::new(
metadata.clone(),
filters,
self.clone(),
))
}
fn decode(&self, bytes: &[u8]) -> Result<CompositeValues> {
Ok(CompositeValues::Sparse(self.decode_sparse(bytes)?))
}
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
self.decode_leftmost(bytes)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -30,7 +30,7 @@ use crate::error::{
PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
};
use crate::read::Batch;
use crate::row_converter::SortField;
use crate::row_converter::{CompositeValues, SortField};
use crate::sst::file::FileId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
@@ -108,7 +108,10 @@ impl BloomFilterIndexer {
return Ok(None);
}
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
let codec = IndexValuesCodec::from_tag_columns(
metadata.primary_key_encoding,
metadata.primary_key_columns(),
);
let indexer = Self {
creators,
temp_file_provider,
@@ -192,11 +195,26 @@ impl BloomFilterIndexer {
let n = batch.num_rows();
guard.inc_row_count(n);
// TODO(weny, zhenchi): lazy decode
let values = self.codec.decode(batch.primary_key())?;
// Tags
for ((col_id, _), field, value) in self.codec.decode(batch.primary_key())? {
for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() {
let Some(creator) = self.creators.get_mut(col_id) else {
continue;
};
let value = match &values {
CompositeValues::Dense(vec) => {
let value = &vec[idx].1;
if value.is_null() {
None
} else {
Some(value)
}
}
CompositeValues::Sparse(sparse_values) => sparse_values.get(col_id),
};
let elems = value
.map(|v| {
let mut buf = vec![];
@@ -411,7 +429,7 @@ pub(crate) mod tests {
}
pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
let fields = vec![SortField::new(ConcreteDataType::string_datatype())];
let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
let codec = DensePrimaryKeyCodec::with_fields(fields);
let row: [ValueRef; 1] = [str_tag.as_ref().into()];
let primary_key = codec.encode(row.into_iter()).unwrap();

View File

@@ -12,15 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use datatypes::value::ValueRef;
use memcomparable::Serializer;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::ColumnMetadata;
use store_api::storage::ColumnId;
use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField};
use crate::row_converter::{
build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, SortField,
};
/// Encodes index values according to their data types for sorting and storage use.
pub struct IndexValueCodec;
@@ -62,26 +68,35 @@ impl IndexValueCodec {
pub struct IndexValuesCodec {
/// Tuples containing column id and its corresponding index_name (result of `to_string` on ColumnId),
/// to minimize redundant `to_string` calls.
column_ids: Vec<(ColumnId, String)>,
column_ids: HashMap<ColumnId, String>,
/// The data types of tag columns.
fields: Vec<SortField>,
fields: Vec<(ColumnId, SortField)>,
/// The decoder for the primary key.
decoder: DensePrimaryKeyCodec,
decoder: Arc<dyn PrimaryKeyCodec>,
}
impl IndexValuesCodec {
/// Creates a new `IndexValuesCodec` from a list of `ColumnMetadata` of tag columns.
pub fn from_tag_columns<'a>(tag_columns: impl Iterator<Item = &'a ColumnMetadata>) -> Self {
pub fn from_tag_columns<'a>(
primary_key_encoding: PrimaryKeyEncoding,
tag_columns: impl Iterator<Item = &'a ColumnMetadata>,
) -> Self {
let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns
.map(|column| {
(
(column.column_id, column.column_id.to_string()),
SortField::new(column.column_schema.data_type.clone()),
(
column.column_id,
SortField::new(column.column_schema.data_type.clone()),
),
)
})
.unzip();
let decoder = DensePrimaryKeyCodec::with_fields(fields.clone());
let column_ids = column_ids.into_iter().collect();
let decoder =
build_primary_key_codec_with_fields(primary_key_encoding, fields.clone().into_iter());
Self {
column_ids,
fields,
@@ -89,26 +104,19 @@ impl IndexValuesCodec {
}
}
/// Returns the column ids of the index.
pub fn column_ids(&self) -> &HashMap<ColumnId, String> {
&self.column_ids
}
/// Returns the fields of the index.
pub fn fields(&self) -> &[(ColumnId, SortField)] {
&self.fields
}
/// Decodes a primary key into its corresponding column ids, data types and values.
pub fn decode(
&self,
primary_key: &[u8],
) -> Result<impl Iterator<Item = (&(ColumnId, String), &SortField, Option<Value>)>> {
let values = self.decoder.decode_dense(primary_key)?;
let iter = values
.into_iter()
.zip(&self.column_ids)
.zip(&self.fields)
.map(|((value, column_id), encoder)| {
if value.is_null() {
(column_id, encoder, None)
} else {
(column_id, encoder, Some(value))
}
});
Ok(iter)
pub fn decode(&self, primary_key: &[u8]) -> Result<CompositeValues> {
self.decoder.decode(primary_key)
}
}
@@ -116,10 +124,12 @@ impl IndexValuesCodec {
mod tests {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use store_api::metadata::ColumnMetadata;
use super::*;
use crate::error::Error;
use crate::row_converter::{PrimaryKeyCodecExt, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
#[test]
fn test_encode_value_basic() {
@@ -167,27 +177,18 @@ mod tests {
];
let primary_key = DensePrimaryKeyCodec::with_fields(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int64_datatype()),
(0, SortField::new(ConcreteDataType::string_datatype())),
(1, SortField::new(ConcreteDataType::int64_datatype())),
])
.encode([ValueRef::Null, ValueRef::Int64(10)].into_iter())
.unwrap();
let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter());
let mut iter = codec.decode(&primary_key).unwrap();
let codec =
IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter());
let values = codec.decode(&primary_key).unwrap().into_dense();
let ((column_id, col_id_str), field, value) = iter.next().unwrap();
assert_eq!(*column_id, 1);
assert_eq!(col_id_str, "1");
assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype()));
assert_eq!(value, None);
let ((column_id, col_id_str), field, value) = iter.next().unwrap();
assert_eq!(*column_id, 2);
assert_eq!(col_id_str, "2");
assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype()));
assert_eq!(value, Some(Value::Int64(10)));
assert!(iter.next().is_none());
assert_eq!(values.len(), 2);
assert_eq!(values[0], Value::Null);
assert_eq!(values[1], Value::Int64(10));
}
}

View File

@@ -34,7 +34,7 @@ use crate::error::{
PushIndexValueSnafu, Result,
};
use crate::read::Batch;
use crate::row_converter::SortField;
use crate::row_converter::{CompositeValues, SortField};
use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::intermediate::{
@@ -101,7 +101,10 @@ impl InvertedIndexer {
);
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
let codec = IndexValuesCodec::from_tag_columns(
metadata.primary_key_encoding,
metadata.primary_key_columns(),
);
Self {
codec,
index_creator,
@@ -180,11 +183,25 @@ impl InvertedIndexer {
let n = batch.num_rows();
guard.inc_row_count(n);
for ((col_id, col_id_str), field, value) in self.codec.decode(batch.primary_key())? {
// TODO(weny, zhenchi): lazy decode
let values = self.codec.decode(batch.primary_key())?;
for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() {
if !self.indexed_column_ids.contains(col_id) {
continue;
}
let value = match &values {
CompositeValues::Dense(vec) => {
let value = &vec[idx].1;
if value.is_null() {
None
} else {
Some(value)
}
}
CompositeValues::Sparse(sparse_values) => sparse_values.get(col_id),
};
if let Some(value) = value.as_ref() {
self.value_buf.clear();
IndexValueCodec::encode_nonnull_value(
@@ -194,6 +211,9 @@ impl InvertedIndexer {
)?;
}
// Safety: the column id is guaranteed to be in the map
let col_id_str = self.codec.column_ids().get(col_id).unwrap();
// non-null value -> Some(encoded_bytes), null value -> None
let value = value.is_some().then_some(self.value_buf.as_slice());
self.index_creator
@@ -381,8 +401,8 @@ mod tests {
u64_field: impl IntoIterator<Item = u64>,
) -> Batch {
let fields = vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int32_datatype()),
(0, SortField::new(ConcreteDataType::string_datatype())),
(1, SortField::new(ConcreteDataType::int32_datatype())),
];
let codec = DensePrimaryKeyCodec::with_fields(fields);
let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];

View File

@@ -33,7 +33,7 @@ use crate::read::compat::CompatBatch;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::prune::PruneReader;
use crate::read::Batch;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext};
@@ -156,7 +156,7 @@ impl FileRangeContext {
reader_builder: RowGroupReaderBuilder,
filters: Vec<SimpleFilterContext>,
read_format: ReadFormat,
codec: DensePrimaryKeyCodec,
codec: Arc<dyn PrimaryKeyCodec>,
) -> Self {
Self {
reader_builder,
@@ -237,7 +237,7 @@ pub(crate) struct RangeBase {
/// Helper to read the SST.
pub(crate) read_format: ReadFormat,
/// Decoder for primary keys
pub(crate) codec: DensePrimaryKeyCodec,
pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
/// Optional helper to compat batches.
pub(crate) compat_batch: Option<CompatBatch>,
}
@@ -264,15 +264,25 @@ impl RangeBase {
input.set_pk_values(self.codec.decode(input.primary_key())?);
input.pk_values().unwrap()
};
// Safety: this is a primary key
let pk_index = self
.read_format
.metadata()
.primary_key_index(filter.column_id())
.unwrap();
let pk_value = pk_values[pk_index]
.try_to_scalar_value(filter.data_type())
.context(FieldTypeMismatchSnafu)?;
let pk_value = match pk_values {
CompositeValues::Dense(v) => {
// Safety: this is a primary key
let pk_index = self
.read_format
.metadata()
.primary_key_index(filter.column_id())
.unwrap();
v[pk_index]
.1
.try_to_scalar_value(filter.data_type())
.context(FieldTypeMismatchSnafu)?
}
CompositeValues::Sparse(v) => {
let v = v.get_or_null(filter.column_id());
v.try_to_scalar_value(filter.data_type())
.context(FieldTypeMismatchSnafu)?
}
};
if filter
.filter()
.evaluate_scalar(&pk_value)

View File

@@ -48,7 +48,7 @@ use crate::error::{
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField};
use crate::row_converter::{build_primary_key_codec_with_fields, SortField};
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::sst::to_sst_arrow_schema;
@@ -391,6 +391,7 @@ impl ReadFormat {
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
let primary_key_encoding = self.metadata.primary_key_encoding;
let is_first_tag = self
.metadata
.primary_key
@@ -402,9 +403,15 @@ impl ReadFormat {
return None;
}
let converter = DensePrimaryKeyCodec::with_fields(vec![SortField::new(
column.column_schema.data_type.clone(),
)]);
let converter = build_primary_key_codec_with_fields(
primary_key_encoding,
[(
column.column_id,
SortField::new(column.column_schema.data_type.clone()),
)]
.into_iter(),
);
let values = row_groups.iter().map(|meta| {
let stats = meta
.borrow()

View File

@@ -49,7 +49,7 @@ use crate::metrics::{
};
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::row_converter::DensePrimaryKeyCodec;
use crate::row_converter::build_primary_key_codec;
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
@@ -253,7 +253,7 @@ impl ParquetReaderBuilder {
vec![]
};
let codec = DensePrimaryKeyCodec::new(read_format.metadata());
let codec = build_primary_key_codec(read_format.metadata());
let context = FileRangeContext::new(reader_builder, filters, read_format, codec);

View File

@@ -326,8 +326,8 @@ pub(crate) fn encode_keys(
/// Encode one key.
pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec<u8> {
let row_codec = DensePrimaryKeyCodec::with_fields(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::uint32_datatype()),
(0, SortField::new(ConcreteDataType::string_datatype())),
(1, SortField::new(ConcreteDataType::uint32_datatype())),
]);
row_codec.encode(key_value.primary_keys()).unwrap()
}

View File

@@ -85,7 +85,12 @@ pub fn sst_region_metadata() -> RegionMetadata {
/// Encodes a primary key for specific tags.
pub fn new_primary_key(tags: &[&str]) -> Vec<u8> {
let fields = (0..tags.len())
.map(|_| SortField::new(ConcreteDataType::string_datatype()))
.map(|idx| {
(
idx as u32,
SortField::new(ConcreteDataType::string_datatype()),
)
})
.collect();
let converter = DensePrimaryKeyCodec::with_fields(fields);
converter