feat: introduce PrimaryKeyEncoding (#5312)

* feat: introduce `PrimaryKeyEncoding`

* fix: fix unit tests

* chore: add empty line

* test: add unit tests

* chore: fmt code

* refactor: introduce new codec trait to support various encoding

* fix: fix unit tests

* chore: update sqlness result

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-01-15 14:16:53 +08:00
committed by GitHub
parent 57f8afcb70
commit b64c075cdb
31 changed files with 1043 additions and 776 deletions

View File

@@ -25,6 +25,7 @@ use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{KeyValues, Memtable};
use mito2::region::options::MergeMode;
use mito2::row_converter::DensePrimaryKeyCodec;
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
use rand::rngs::ThreadRng;
use rand::seq::SliceRandom;
@@ -43,8 +44,14 @@ fn write_rows(c: &mut Criterion) {
// Note that this test only generate one time series.
let mut group = c.benchmark_group("write");
group.bench_function("partition_tree", |b| {
let memtable =
PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default());
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig::default(),
);
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
b.iter(|| {
@@ -71,7 +78,8 @@ fn full_scan(c: &mut Criterion) {
let mut group = c.benchmark_group("full_scan");
group.sample_size(10);
group.bench_function("partition_tree", |b| {
let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config);
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(1, codec, metadata.clone(), None, &config);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}
@@ -108,7 +116,8 @@ fn filter_1_host(c: &mut Criterion) {
let mut group = c.benchmark_group("filter_1_host");
group.sample_size(10);
group.bench_function("partition_tree", |b| {
let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config);
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(1, codec, metadata.clone(), None, &config);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}

View File

@@ -553,7 +553,7 @@ async fn test_region_usage() {
// region is empty now, check manifest size
let region = engine.get_region(region_id).unwrap();
let region_stat = region.region_statistic();
assert_eq!(region_stat.manifest_size, 686);
assert_eq!(region_stat.manifest_size, 717);
// put some rows
let rows = Rows {

View File

@@ -592,6 +592,6 @@ mod test {
// get manifest size again
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, 1173);
assert_eq!(manifest_size, 1204);
}
}

View File

@@ -154,7 +154,7 @@ async fn manager_with_checkpoint_distance_1() {
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json =
"{\"size\":848,\"version\":10,\"checksum\":4186457347,\"extend_metadata\":{}}";
"{\"size\":879,\"version\":10,\"checksum\":2245967096,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);
// reopen the manager

View File

@@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::row_converter::McmpRowCodec;
use crate::row_converter::DensePrimaryKeyCodec;
use crate::sst::parquet::file_range::RangeBase;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::SimpleFilterContext;
@@ -41,7 +41,7 @@ impl BulkIterContext {
projection: &Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Self {
let codec = McmpRowCodec::new_with_primary_keys(&region_metadata);
let codec = DensePrimaryKeyCodec::new(&region_metadata);
let simple_filters = predicate
.as_ref()

View File

@@ -48,7 +48,7 @@ use crate::memtable::bulk::context::BulkIterContextRef;
use crate::memtable::bulk::part_reader::BulkPartIter;
use crate::memtable::key_values::KeyValuesRef;
use crate::memtable::BoxedBatchIterator;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::to_sst_arrow_schema;
@@ -103,7 +103,7 @@ pub struct BulkPartMeta {
pub struct BulkPartEncoder {
metadata: RegionMetadataRef,
pk_encoder: McmpRowCodec,
pk_encoder: DensePrimaryKeyCodec,
row_group_size: usize,
dedup: bool,
writer_props: Option<WriterProperties>,
@@ -115,7 +115,7 @@ impl BulkPartEncoder {
dedup: bool,
row_group_size: usize,
) -> BulkPartEncoder {
let codec = McmpRowCodec::new_with_primary_keys(&metadata);
let codec = DensePrimaryKeyCodec::new(&metadata);
let writer_props = Some(
WriterProperties::builder()
.set_write_batch_size(row_group_size)
@@ -174,7 +174,7 @@ impl BulkPartEncoder {
fn mutations_to_record_batch(
mutations: &[Mutation],
metadata: &RegionMetadataRef,
pk_encoder: &McmpRowCodec,
pk_encoder: &DensePrimaryKeyCodec,
dedup: bool,
) -> Result<Option<(RecordBatch, i64, i64)>> {
let total_rows: usize = mutations
@@ -538,7 +538,7 @@ mod tests {
.map(|r| r.rows.len())
.sum();
let pk_encoder = McmpRowCodec::new_with_primary_keys(&metadata);
let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
.unwrap()
@@ -557,7 +557,7 @@ mod tests {
let batch_values = batches
.into_iter()
.map(|b| {
let pk_values = pk_encoder.decode(b.primary_key()).unwrap();
let pk_values = pk_encoder.decode_dense(b.primary_key()).unwrap();
let timestamps = b
.timestamps()
.as_any()

View File

@@ -28,7 +28,9 @@ use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
pub(crate) use partition::DensePrimaryKeyFilter;
use serde::{Deserialize, Serialize};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
@@ -43,6 +45,7 @@ use crate::memtable::{
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
};
use crate::region::options::MergeMode;
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec};
/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
pub(crate) const DICTIONARY_SIZE_FACTOR: u64 = 8;
@@ -263,13 +266,14 @@ impl PartitionTreeMemtable {
/// Returns a new memtable.
pub fn new(
id: MemtableId,
row_codec: Arc<dyn PrimaryKeyCodec>,
metadata: RegionMetadataRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
config: &PartitionTreeConfig,
) -> Self {
Self::with_tree(
id,
PartitionTree::new(metadata, config, write_buffer_manager.clone()),
PartitionTree::new(row_codec, metadata, config, write_buffer_manager.clone()),
)
}
@@ -320,12 +324,22 @@ impl PartitionTreeMemtableBuilder {
impl MemtableBuilder for PartitionTreeMemtableBuilder {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(PartitionTreeMemtable::new(
id,
metadata.clone(),
self.write_buffer_manager.clone(),
&self.config,
))
match metadata.primary_key_encoding {
PrimaryKeyEncoding::Dense => {
let codec = Arc::new(DensePrimaryKeyCodec::new(metadata));
Arc::new(PartitionTreeMemtable::new(
id,
codec,
metadata.clone(),
self.write_buffer_manager.clone(),
&self.config,
))
}
PrimaryKeyEncoding::Sparse => {
//TODO(weny): Implement sparse primary key encoding.
todo!()
}
}
}
}
@@ -358,7 +372,7 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::test_util::memtable_util::{
self, collect_iter_timestamps, region_metadata_to_row_schema,
};
@@ -378,8 +392,14 @@ mod tests {
let timestamps = (0..100).collect::<Vec<_>>();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
let memtable =
PartitionTreeMemtable::new(1, metadata, None, &PartitionTreeConfig::default());
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig::default(),
);
memtable.write(&kvs).unwrap();
let expected_ts = kvs
@@ -414,8 +434,14 @@ mod tests {
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
let memtable =
PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default());
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig::default(),
);
let kvs = memtable_util::build_key_values(
&metadata,
@@ -510,8 +536,10 @@ mod tests {
fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
let memtable = PartitionTreeMemtable::new(
1,
codec,
metadata.clone(),
None,
&PartitionTreeConfig {
@@ -719,12 +747,7 @@ mod tests {
)
.build(1, &metadata);
let codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
let codec = DensePrimaryKeyCodec::new(&metadata);
memtable
.write(&build_key_values(

View File

@@ -22,6 +22,7 @@ use std::time::{Duration, Instant};
use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
@@ -38,7 +39,7 @@ use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
use crate::memtable::stats::WriteMetrics;
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchBuilder};
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyFilter};
/// Key of a partition.
pub type PartitionKey = u32;
@@ -65,7 +66,7 @@ impl Partition {
pub fn write_with_key(
&self,
primary_key: &mut Vec<u8>,
row_codec: &McmpRowCodec,
row_codec: &dyn PrimaryKeyCodec,
key_value: KeyValue,
re_encode: bool,
metrics: &mut WriteMetrics,
@@ -85,17 +86,25 @@ impl Partition {
// Key does not yet exist in shard or builder, encode and insert the full primary key.
if re_encode {
// `primary_key` is sparse, re-encode the full primary key.
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?;
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
&key_value,
metrics,
);
inner.pk_to_pk_id.insert(sparse_key, pk_id);
match row_codec.encoding() {
PrimaryKeyEncoding::Dense => {
// `primary_key` is sparse, re-encode the full primary key.
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_key_value(&key_value, primary_key)?;
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
&key_value,
metrics,
);
inner.pk_to_pk_id.insert(sparse_key, pk_id);
}
PrimaryKeyEncoding::Sparse => {
// TODO(weny): support sparse primary key.
todo!()
}
}
} else {
// `primary_key` is already the full primary key.
let pk_id = inner
@@ -126,18 +135,23 @@ impl Partition {
Ok(())
}
fn build_primary_key_filter(
need_prune_key: bool,
metadata: &RegionMetadataRef,
row_codec: &dyn PrimaryKeyCodec,
filters: &Arc<Vec<SimpleFilterEvaluator>>,
) -> Option<Box<dyn PrimaryKeyFilter>> {
if need_prune_key {
let filter = row_codec.primary_key_filter(metadata, filters.clone());
Some(filter)
} else {
None
}
}
/// Scans data in the partition.
pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
let start = Instant::now();
let key_filter = if context.need_prune_key {
Some(PrimaryKeyFilter::new(
context.metadata.clone(),
context.filters.clone(),
context.row_codec.clone(),
))
} else {
None
};
let (builder_source, shard_reader_builders) = {
let inner = self.inner.read().unwrap();
let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
@@ -157,20 +171,33 @@ impl Partition {
};
context.metrics.num_shards += shard_reader_builders.len();
let mut nodes = shard_reader_builders
.into_iter()
.map(|builder| {
let primary_key_filter = Self::build_primary_key_filter(
context.need_prune_key,
&context.metadata,
context.row_codec.as_ref(),
&context.filters,
);
Ok(ShardNode::new(ShardSource::Shard(
builder.build(key_filter.clone())?,
builder.build(primary_key_filter)?,
)))
})
.collect::<Result<Vec<_>>>()?;
if let Some(builder) = builder_source {
context.metrics.num_builder += 1;
let primary_key_filter = Self::build_primary_key_filter(
context.need_prune_key,
&context.metadata,
context.row_codec.as_ref(),
&context.filters,
);
// Move the initialization of ShardBuilderReader out of read lock.
let shard_builder_reader =
builder.build(Some(&context.pk_weights), key_filter.clone())?;
builder.build(Some(&context.pk_weights), primary_key_filter)?;
nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
}
@@ -354,19 +381,20 @@ impl PartitionReader {
}
}
/// Dense primary key filter.
#[derive(Clone)]
pub(crate) struct PrimaryKeyFilter {
pub struct DensePrimaryKeyFilter {
metadata: RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
codec: Arc<McmpRowCodec>,
codec: DensePrimaryKeyCodec,
offsets_buf: Vec<usize>,
}
impl PrimaryKeyFilter {
impl DensePrimaryKeyFilter {
pub(crate) fn new(
metadata: RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
codec: Arc<McmpRowCodec>,
codec: DensePrimaryKeyCodec,
) -> Self {
Self {
metadata,
@@ -375,8 +403,10 @@ impl PrimaryKeyFilter {
offsets_buf: Vec::new(),
}
}
}
pub(crate) fn prune_primary_key(&mut self, pk: &[u8]) -> bool {
impl PrimaryKeyFilter for DensePrimaryKeyFilter {
fn prune_primary_key(&mut self, pk: &[u8]) -> bool {
if self.filters.is_empty() {
return true;
}
@@ -428,7 +458,7 @@ impl PrimaryKeyFilter {
/// Structs to reuse across readers to avoid allocating for each reader.
pub(crate) struct ReadPartitionContext {
metadata: RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
row_codec: Arc<dyn PrimaryKeyCodec>,
projection: HashSet<ColumnId>,
filters: Arc<Vec<SimpleFilterEvaluator>>,
/// Buffer to store pk weights.
@@ -467,16 +497,16 @@ impl Drop for ReadPartitionContext {
impl ReadPartitionContext {
pub(crate) fn new(
metadata: RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
row_codec: Arc<dyn PrimaryKeyCodec>,
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
filters: Arc<Vec<SimpleFilterEvaluator>>,
) -> ReadPartitionContext {
let need_prune_key = Self::need_prune_key(&metadata, &filters);
ReadPartitionContext {
metadata,
row_codec,
projection,
filters: Arc::new(filters),
filters,
pk_weights: Vec::new(),
need_prune_key,
metrics: Default::default(),

View File

@@ -26,10 +26,10 @@ use crate::memtable::partition_tree::data::{
};
use crate::memtable::partition_tree::dict::KeyDictRef;
use crate::memtable::partition_tree::merger::{Merger, Node};
use crate::memtable::partition_tree::partition::PrimaryKeyFilter;
use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::row_converter::PrimaryKeyFilter;
/// Shard stores data related to the same key dictionary.
pub struct Shard {
@@ -146,7 +146,10 @@ pub struct ShardReaderBuilder {
}
impl ShardReaderBuilder {
pub(crate) fn build(self, key_filter: Option<PrimaryKeyFilter>) -> Result<ShardReader> {
pub(crate) fn build(
self,
key_filter: Option<Box<dyn PrimaryKeyFilter>>,
) -> Result<ShardReader> {
let ShardReaderBuilder {
shard_id,
key_dict,
@@ -163,7 +166,7 @@ pub struct ShardReader {
shard_id: ShardId,
key_dict: Option<KeyDictRef>,
parts_reader: DataPartsReader,
key_filter: Option<PrimaryKeyFilter>,
key_filter: Option<Box<dyn PrimaryKeyFilter>>,
last_yield_pk_index: Option<PkIndex>,
keys_before_pruning: usize,
keys_after_pruning: usize,
@@ -176,7 +179,7 @@ impl ShardReader {
shard_id: ShardId,
key_dict: Option<KeyDictRef>,
parts_reader: DataPartsReader,
key_filter: Option<PrimaryKeyFilter>,
key_filter: Option<Box<dyn PrimaryKeyFilter>>,
data_build_cost: Duration,
) -> Result<Self> {
let has_pk = key_dict.is_some();

View File

@@ -26,11 +26,11 @@ use crate::memtable::partition_tree::data::{
DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP,
};
use crate::memtable::partition_tree::dict::{DictBuilderReader, KeyDictBuilder};
use crate::memtable::partition_tree::partition::PrimaryKeyFilter;
use crate::memtable::partition_tree::shard::Shard;
use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
use crate::memtable::stats::WriteMetrics;
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::row_converter::PrimaryKeyFilter;
/// Builder to write keys and data to a shard that the key dictionary
/// is still active.
@@ -189,7 +189,7 @@ impl ShardBuilderReaderBuilder {
pub(crate) fn build(
self,
pk_weights: Option<&[u16]>,
key_filter: Option<PrimaryKeyFilter>,
key_filter: Option<Box<dyn PrimaryKeyFilter>>,
) -> Result<ShardBuilderReader> {
let now = Instant::now();
let data_reader = self.data_reader.build(pk_weights)?;
@@ -208,7 +208,7 @@ pub struct ShardBuilderReader {
shard_id: ShardId,
dict_reader: DictBuilderReader,
data_reader: DataBufferReader,
key_filter: Option<PrimaryKeyFilter>,
key_filter: Option<Box<dyn PrimaryKeyFilter>>,
last_yield_pk_index: Option<PkIndex>,
keys_before_pruning: usize,
keys_after_pruning: usize,
@@ -221,7 +221,7 @@ impl ShardBuilderReader {
shard_id: ShardId,
dict_reader: DictBuilderReader,
data_reader: DataBufferReader,
key_filter: Option<PrimaryKeyFilter>,
key_filter: Option<Box<dyn PrimaryKeyFilter>>,
data_build_cost: Duration,
) -> Result<Self> {
let mut reader = ShardBuilderReader {

View File

@@ -43,7 +43,7 @@ use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_ST
use crate::read::dedup::LastNonNullIter;
use crate::read::Batch;
use crate::region::options::MergeMode;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{PrimaryKeyCodec, SortField};
/// The partition tree.
pub struct PartitionTree {
@@ -52,7 +52,7 @@ pub struct PartitionTree {
/// Metadata of the region.
pub(crate) metadata: RegionMetadataRef,
/// Primary key codec.
row_codec: Arc<McmpRowCodec>,
row_codec: Arc<dyn PrimaryKeyCodec>,
/// Partitions in the tree.
partitions: RwLock<BTreeMap<PartitionKey, PartitionRef>>,
/// Whether the tree has multiple partitions.
@@ -65,16 +65,11 @@ pub struct PartitionTree {
impl PartitionTree {
/// Creates a new partition tree.
pub fn new(
row_codec: Arc<dyn PrimaryKeyCodec>,
metadata: RegionMetadataRef,
config: &PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> PartitionTree {
let row_codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
) -> Self {
let sparse_encoder = SparseEncoder {
fields: metadata
.primary_key_columns()
@@ -93,7 +88,7 @@ impl PartitionTree {
PartitionTree {
config,
metadata,
row_codec: Arc::new(row_codec),
row_codec,
partitions: Default::default(),
is_partitioned,
write_buffer_manager,
@@ -141,7 +136,7 @@ impl PartitionTree {
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
} else {
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
self.row_codec.encode_key_value(&kv, pk_buffer)?;
}
// Write rows with
@@ -191,7 +186,7 @@ impl PartitionTree {
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
} else {
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
self.row_codec.encode_key_value(&kv, pk_buffer)?;
}
// Write rows with
@@ -238,7 +233,7 @@ impl PartitionTree {
self.metadata.clone(),
self.row_codec.clone(),
projection,
filters,
Arc::new(filters),
);
iter.fetch_next_partition(context)?;
@@ -278,7 +273,12 @@ impl PartitionTree {
|| self.metadata.column_metadatas != metadata.column_metadatas
{
// The schema has changed, we can't reuse the tree.
return PartitionTree::new(metadata, &self.config, self.write_buffer_manager.clone());
return PartitionTree::new(
self.row_codec.clone(),
metadata,
&self.config,
self.write_buffer_manager.clone(),
);
}
let mut total_shared_size = 0;
@@ -353,7 +353,7 @@ impl PartitionTree {
partition.write_with_key(
primary_key,
&self.row_codec,
self.row_codec.as_ref(),
key_value,
self.is_partitioned, // If tree is partitioned, re-encode is required to get the full primary key.
metrics,

View File

@@ -51,7 +51,7 @@ use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::dedup::LastNonNullIter;
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::region::options::MergeMode;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 0;
@@ -95,7 +95,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder {
pub struct TimeSeriesMemtable {
id: MemtableId,
region_metadata: RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
row_codec: Arc<DensePrimaryKeyCodec>,
series_set: SeriesSet,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
@@ -115,12 +115,7 @@ impl TimeSeriesMemtable {
dedup: bool,
merge_mode: MergeMode,
) -> Self {
let row_codec = Arc::new(McmpRowCodec::new(
region_metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
));
let row_codec = Arc::new(DensePrimaryKeyCodec::new(&region_metadata));
let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
let dedup = if merge_mode == MergeMode::LastNonNull {
false
@@ -350,11 +345,11 @@ type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
struct SeriesSet {
region_metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
codec: Arc<McmpRowCodec>,
codec: Arc<DensePrimaryKeyCodec>,
}
impl SeriesSet {
fn new(region_metadata: RegionMetadataRef, codec: Arc<McmpRowCodec>) -> Self {
fn new(region_metadata: RegionMetadataRef, codec: Arc<DensePrimaryKeyCodec>) -> Self {
Self {
region_metadata,
series: Default::default(),
@@ -451,7 +446,7 @@ struct Iter {
predicate: Vec<SimpleFilterEvaluator>,
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<McmpRowCodec>,
codec: Arc<DensePrimaryKeyCodec>,
dedup: bool,
metrics: Metrics,
}
@@ -465,7 +460,7 @@ impl Iter {
predicate: Option<Predicate>,
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<McmpRowCodec>,
codec: Arc<DensePrimaryKeyCodec>,
dedup: bool,
) -> Result<Self> {
let predicate = predicate
@@ -560,7 +555,7 @@ impl Iterator for Iter {
}
fn prune_primary_key(
codec: &Arc<McmpRowCodec>,
codec: &Arc<DensePrimaryKeyCodec>,
pk: &[u8],
series: &mut Series,
datatypes: &[ConcreteDataType],
@@ -896,6 +891,7 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::SortField;
fn schema_for_test() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
@@ -1160,7 +1156,7 @@ mod tests {
#[test]
fn test_series_set_concurrency() {
let schema = schema_for_test();
let row_codec = Arc::new(McmpRowCodec::new(
let row_codec = Arc::new(DensePrimaryKeyCodec::with_fields(
schema
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))

View File

@@ -26,7 +26,7 @@ use store_api::storage::ColumnId;
use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::{Batch, BatchColumn, BatchReader};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField};
/// Reader to adapt schema of underlying reader to expected schema.
pub struct CompatReader<R> {
@@ -127,7 +127,7 @@ pub(crate) fn has_same_columns(left: &RegionMetadata, right: &RegionMetadata) ->
#[derive(Debug)]
struct CompatPrimaryKey {
/// Row converter to append values to primary keys.
converter: McmpRowCodec,
converter: DensePrimaryKeyCodec,
/// Default values to append.
values: Vec<Value>,
}
@@ -138,10 +138,7 @@ impl CompatPrimaryKey {
let mut buffer =
Vec::with_capacity(batch.primary_key().len() + self.converter.estimated_size());
buffer.extend_from_slice(batch.primary_key());
self.converter.encode_to_vec(
self.values.iter().map(|value| value.as_value_ref()),
&mut buffer,
)?;
self.converter.encode_values(&self.values, &mut buffer)?;
batch.set_primary_key(buffer);
@@ -268,7 +265,7 @@ fn may_compat_primary_key(
})?;
values.push(default_value);
}
let converter = McmpRowCodec::new(fields);
let converter = DensePrimaryKeyCodec::with_fields(fields);
Ok(Some(CompatPrimaryKey { converter, values }))
}
@@ -366,6 +363,7 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::PrimaryKeyCodecExt;
use crate::test_util::{check_reader_result, VecBatchReader};
/// Creates a new [RegionMetadata].
@@ -400,7 +398,7 @@ mod tests {
let fields = (0..keys.len())
.map(|_| SortField::new(ConcreteDataType::string_datatype()))
.collect();
let converter = McmpRowCodec::new(fields);
let converter = DensePrimaryKeyCodec::with_fields(fields);
let row = keys.iter().map(|str_opt| match str_opt {
Some(v) => ValueRef::String(v),
None => ValueRef::Null,

View File

@@ -33,7 +33,7 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec};
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
@@ -47,7 +47,7 @@ pub struct ProjectionMapper {
/// Output record batch contains tags.
has_tags: bool,
/// Decoder for primary key.
codec: McmpRowCodec,
codec: DensePrimaryKeyCodec,
/// Schema for converted [RecordBatch].
output_schema: SchemaRef,
/// Ids of columns to project. It keeps ids in the same order as the `projection`
@@ -80,12 +80,7 @@ impl ProjectionMapper {
// Safety: idx is valid.
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
}
let codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
let codec = DensePrimaryKeyCodec::new(metadata);
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
// Get fields in each batch.
@@ -186,7 +181,7 @@ impl ProjectionMapper {
Some(v) => v.to_vec(),
None => self
.codec
.decode(batch.primary_key())
.decode_dense(batch.primary_key())
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
}
@@ -291,6 +286,7 @@ mod tests {
use super::*;
use crate::cache::CacheManager;
use crate::read::BatchBuilder;
use crate::row_converter::{PrimaryKeyCodecExt, SortField};
use crate::test_util::meta_util::TestRegionMetadataBuilder;
fn new_batch(
@@ -299,7 +295,7 @@ mod tests {
fields: &[(ColumnId, i64)],
num_rows: usize,
) -> Batch {
let converter = McmpRowCodec::new(
let converter = DensePrimaryKeyCodec::with_fields(
(0..tags.len())
.map(|_| SortField::new(ConcreteDataType::int64_datatype()))
.collect(),

View File

@@ -24,10 +24,10 @@ use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::util::{join_dir, normalize_dir};
use snafu::{ensure, OptionExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionRole;
use store_api::storage::{ColumnId, RegionId};
@@ -35,7 +35,8 @@ use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
EmptyRegionDirSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu, Result, StaleLogEntrySnafu,
EmptyRegionDirSnafu, InvalidMetadataSnafu, ObjectStoreNotFoundSnafu, RegionCorruptedSnafu,
Result, StaleLogEntrySnafu,
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
@@ -59,7 +60,7 @@ use crate::wal::{EntryId, Wal};
/// Builder to create a new [MitoRegion] or open an existing one.
pub(crate) struct RegionOpener {
region_id: RegionId,
metadata: Option<RegionMetadata>,
metadata_builder: Option<RegionMetadataBuilder>,
memtable_builder_provider: MemtableBuilderProvider,
object_store_manager: ObjectStoreManagerRef,
region_dir: String,
@@ -90,7 +91,7 @@ impl RegionOpener {
) -> RegionOpener {
RegionOpener {
region_id,
metadata: None,
metadata_builder: None,
memtable_builder_provider,
object_store_manager,
region_dir: normalize_dir(region_dir),
@@ -106,16 +107,27 @@ impl RegionOpener {
}
}
/// Sets metadata of the region to create.
pub(crate) fn metadata(mut self, metadata: RegionMetadata) -> Self {
self.metadata = Some(metadata);
/// Sets metadata builder of the region to create.
pub(crate) fn metadata_builder(mut self, builder: RegionMetadataBuilder) -> Self {
self.metadata_builder = Some(builder);
self
}
/// Builds the region metadata.
///
/// # Panics
/// - Panics if `options` is not set.
/// - Panics if `metadata_builder` is not set.
fn build_metadata(&mut self) -> Result<RegionMetadata> {
let options = self.options.as_ref().unwrap();
let mut metadata_builder = self.metadata_builder.take().unwrap();
metadata_builder.primary_key_encoding(options.primary_key_encoding());
metadata_builder.build().context(InvalidMetadataSnafu)
}
/// Parses and sets options for the region.
pub(crate) fn parse_options(mut self, options: HashMap<String, String>) -> Result<Self> {
self.options = Some(RegionOptions::try_from(&options)?);
Ok(self)
pub(crate) fn parse_options(self, options: HashMap<String, String>) -> Result<Self> {
self.options(RegionOptions::try_from(&options)?)
}
/// If a [WalEntryReader] is set, the [RegionOpener] will use [WalEntryReader] instead of
@@ -151,21 +163,21 @@ impl RegionOpener {
/// Opens the region if it already exists.
///
/// # Panics
/// - Panics if metadata is not set.
/// - Panics if options is not set.
/// - Panics if `metadata_builder` is not set.
/// - Panics if `options` is not set.
pub(crate) async fn create_or_open<S: LogStore>(
mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<MitoRegion> {
let region_id = self.region_id;
let metadata = self.build_metadata()?;
// Tries to open the region.
match self.maybe_open(config, wal).await {
Ok(Some(region)) => {
let recovered = region.metadata();
// Checks the schema of the region.
let expect = self.metadata.as_ref().unwrap();
let expect = &metadata;
check_recovered_region(
&recovered,
expect.region_id,
@@ -189,13 +201,13 @@ impl RegionOpener {
);
}
}
// Safety: must be set before calling this method.
let options = self.options.take().unwrap();
let object_store = self.object_store(&options.storage)?.clone();
let provider = self.provider(&options.wal_options);
let metadata = Arc::new(metadata);
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
let metadata = Arc::new(self.metadata.unwrap());
let manifest_manager = RegionManifestManager::new(
metadata.clone(),
region_manifest_options,

View File

@@ -27,6 +27,7 @@ use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use serde_with::{serde_as, with_prefix, DisplayFromStr, NoneAsEmptyString};
use snafu::{ensure, ResultExt};
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::ColumnId;
use strum::EnumString;
@@ -93,10 +94,19 @@ impl RegionOptions {
!self.append_mode
}
/// Returns the `merge_mode` if it is set, otherwise returns the default `MergeMode`.
/// Returns the `merge_mode` if it is set, otherwise returns the default [`MergeMode`].
pub fn merge_mode(&self) -> MergeMode {
self.merge_mode.unwrap_or_default()
}
/// Returns the `primary_key_encoding` if it is set, otherwise returns the default [`PrimaryKeyEncoding`].
pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
self.memtable
.as_ref()
.map_or(PrimaryKeyEncoding::default(), |memtable| {
memtable.primary_key_encoding()
})
}
}
impl TryFrom<&HashMap<String, String>> for RegionOptions {
@@ -319,6 +329,16 @@ pub enum MemtableOptions {
with_prefix!(prefix_partition_tree "memtable.partition_tree.");
impl MemtableOptions {
/// Returns the primary key encoding mode.
pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
match self {
MemtableOptions::PartitionTree(opts) => opts.primary_key_encoding,
_ => PrimaryKeyEncoding::Dense,
}
}
}
/// Partition tree memtable options.
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -332,6 +352,8 @@ pub struct PartitionTreeOptions {
pub data_freeze_threshold: usize,
/// Total bytes of dictionary to keep in fork.
pub fork_dictionary_bytes: ReadableSize,
/// Primary key encoding mode.
pub primary_key_encoding: PrimaryKeyEncoding,
}
impl Default for PartitionTreeOptions {
@@ -350,6 +372,7 @@ impl Default for PartitionTreeOptions {
index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD,
fork_dictionary_bytes,
primary_key_encoding: PrimaryKeyEncoding::Dense,
}
}
}
@@ -644,6 +667,7 @@ mod tests {
index_max_keys_per_shard: 2048,
data_freeze_threshold: 2048,
fork_dictionary_bytes: ReadableSize::mb(128),
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
};
@@ -679,6 +703,7 @@ mod tests {
index_max_keys_per_shard: 2048,
data_freeze_threshold: 2048,
fork_dictionary_bytes: ReadableSize::mb(128),
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
};
@@ -747,6 +772,7 @@ mod tests {
index_max_keys_per_shard: 2048,
data_freeze_threshold: 2048,
fork_dictionary_bytes: ReadableSize::mb(128),
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
};

View File

@@ -12,32 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use bytes::Buf;
use common_base::bytes::Bytes;
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Value;
use datatypes::types::IntervalType;
use datatypes::value::ValueRef;
use memcomparable::{Deserializer, Serializer};
use paste::paste;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::metadata::RegionMetadata;
mod dense;
use crate::error;
use crate::error::{FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu};
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use datatypes::value::{Value, ValueRef};
pub use dense::{DensePrimaryKeyCodec, SortField};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
/// Row value encoder/decoder.
pub trait RowCodec {
pub trait PrimaryKeyCodecExt {
/// Encodes rows to bytes.
/// # Note
/// Ensure the length of row iterator matches the length of fields.
fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
where
I: Iterator<Item = ValueRef<'a>>;
I: Iterator<Item = ValueRef<'a>>,
{
let mut buffer = Vec::new();
self.encode_to_vec(row, &mut buffer)?;
Ok(buffer)
}
/// Encodes rows to specific vec.
/// # Note
@@ -50,594 +50,41 @@ pub trait RowCodec {
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
pub(crate) data_type: ConcreteDataType,
pub trait PrimaryKeyFilter: Send + Sync {
/// Returns true if need to prune the primary key.
fn prune_primary_key(&mut self, pk: &[u8]) -> bool;
}
impl SortField {
pub fn new(data_type: ConcreteDataType) -> Self {
Self { data_type }
}
pub trait PrimaryKeyCodec: Send + Sync {
/// Encodes a key value to bytes.
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()>;
pub fn estimated_size(&self) -> usize {
match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
ConcreteDataType::Float32(_) => 5,
ConcreteDataType::Float64(_) => 9,
ConcreteDataType::Binary(_)
| ConcreteDataType::Json(_)
| ConcreteDataType::Vector(_) => 11,
ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes.
ConcreteDataType::Date(_) => 5,
ConcreteDataType::DateTime(_) => 9,
ConcreteDataType::Timestamp(_) => 10,
ConcreteDataType::Time(_) => 10,
ConcreteDataType::Duration(_) => 10,
ConcreteDataType::Interval(_) => 18,
ConcreteDataType::Decimal128(_) => 19,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => 0,
}
}
}
/// Encodes values to bytes.
fn encode_values(&self, values: &[Value], buffer: &mut Vec<u8>) -> Result<()>;
impl SortField {
pub(crate) fn serialize(
/// Returns the number of fields in the primary key.
fn num_fields(&self) -> usize;
/// Returns a primary key filter factory.
fn primary_key_filter(
&self,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
) -> Result<()> {
macro_rules! cast_value_and_serialize {
(
$self: ident;
$serializer: ident;
$(
$ty: ident, $f: ident
),*
) => {
match &$self.data_type {
$(
ConcreteDataType::$ty(_) => {
paste!{
value
.[<as_ $f>]()
.context(FieldTypeMismatchSnafu)?
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
}
)*
ConcreteDataType::Timestamp(_) => {
let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?;
timestamp
.map(|t|t.value())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
let interval = value.as_interval_year_month().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i32())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
let interval = value.as_interval_day_time().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i64())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
let interval = value.as_interval_month_day_nano().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i128())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::List(_) |
ConcreteDataType::Dictionary(_) |
ConcreteDataType::Null(_) => {
return error::NotSupportedFieldSnafu {
data_type: $self.data_type.clone()
}.fail()
}
}
};
}
cast_value_and_serialize!(self; serializer;
Boolean, boolean,
Binary, binary,
Int8, i8,
UInt8, u8,
Int16, i16,
UInt16, u16,
Int32, i32,
UInt32, u32,
Int64, i64,
UInt64, u64,
Float32, f32,
Float64, f64,
String, string,
Date, date,
DateTime, datetime,
Time, time,
Duration, duration,
Decimal128, decimal128,
Json, binary,
Vector, binary
);
metadata: &RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
) -> Box<dyn PrimaryKeyFilter>;
Ok(())
/// Returns the estimated size of the primary key.
fn estimated_size(&self) -> Option<usize> {
None
}
fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
use common_time::DateTime;
macro_rules! deserialize_and_build_value {
(
$self: ident;
$serializer: ident;
$(
$ty: ident, $f: ident
),*
) => {
/// Returns the encoding type of the primary key.
fn encoding(&self) -> PrimaryKeyEncoding;
match &$self.data_type {
$(
ConcreteDataType::$ty(_) => {
Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
}
)*
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from(
Option::<Vec<u8>>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(Bytes::from),
)),
ConcreteDataType::Timestamp(ty) => {
let timestamp = Option::<i64>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(|t|ty.create_timestamp(t));
Ok(Value::from(timestamp))
}
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
let interval = Option::<i32>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalYearMonth::from_i32);
Ok(Value::from(interval))
}
ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
let interval = Option::<i64>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalDayTime::from_i64);
Ok(Value::from(interval))
}
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
let interval = Option::<i128>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalMonthDayNano::from_i128);
Ok(Value::from(interval))
}
ConcreteDataType::List(l) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::List(l.clone()),
}
.fail(),
ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::Dictionary(d.clone()),
}
.fail(),
ConcreteDataType::Null(n) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::Null(n.clone()),
}
.fail(),
}
};
}
deserialize_and_build_value!(self; deserializer;
Boolean, bool,
Int8, i8,
Int16, i16,
Int32, i32,
Int64, i64,
UInt8, u8,
UInt16, u16,
UInt32, u32,
UInt64, u64,
Float32, f32,
Float64, f64,
String, String,
Date, Date,
Time, Time,
DateTime, DateTime,
Duration, Duration,
Decimal128, Decimal128
)
}
/// Skip deserializing this field, returns the length of it.
fn skip_deserialize(
&self,
bytes: &[u8],
deserializer: &mut Deserializer<&[u8]>,
) -> Result<usize> {
let pos = deserializer.position();
if bytes[pos] == 0 {
deserializer.advance(1);
return Ok(1);
}
let to_skip = match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
ConcreteDataType::Float32(_) => 5,
ConcreteDataType::Float64(_) => 9,
ConcreteDataType::Binary(_)
| ConcreteDataType::Json(_)
| ConcreteDataType::Vector(_) => {
// Now the encoder encode binary as a list of bytes so we can't use
// skip bytes.
let pos_before = deserializer.position();
let mut current = pos_before + 1;
while bytes[current] == 1 {
current += 2;
}
let to_skip = current - pos_before + 1;
deserializer.advance(to_skip);
return Ok(to_skip);
}
ConcreteDataType::String(_) => {
let pos_before = deserializer.position();
deserializer.advance(1);
deserializer
.skip_bytes()
.context(error::DeserializeFieldSnafu)?;
return Ok(deserializer.position() - pos_before);
}
ConcreteDataType::Date(_) => 5,
ConcreteDataType::DateTime(_) => 9,
ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option<i64>
ConcreteDataType::Time(_) => 10, // i64 and 1 byte time unit
ConcreteDataType::Duration(_) => 10,
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5,
ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9,
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17,
ConcreteDataType::Decimal128(_) => 19,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => 0,
};
deserializer.advance(to_skip);
Ok(to_skip)
}
}
/// A memory-comparable row [Value] encoder/decoder.
#[derive(Debug)]
pub struct McmpRowCodec {
fields: Vec<SortField>,
}
impl McmpRowCodec {
/// Creates [McmpRowCodec] instance with all primary keys in given `metadata`.
pub fn new_with_primary_keys(metadata: &RegionMetadata) -> Self {
Self::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
)
}
pub fn new(fields: Vec<SortField>) -> Self {
Self { fields }
}
pub fn num_fields(&self) -> usize {
self.fields.len()
}
/// Estimated length for encoded bytes.
pub fn estimated_size(&self) -> usize {
self.fields.iter().map(|f| f.estimated_size()).sum()
}
/// Decode value at `pos` in `bytes`.
/// Decodes the primary key from the given bytes.
///
/// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`.
pub fn decode_value_at(
&self,
bytes: &[u8],
pos: usize,
offsets_buf: &mut Vec<usize>,
) -> Result<Value> {
let mut deserializer = Deserializer::new(bytes);
if pos < offsets_buf.len() {
// We computed the offset before.
let to_skip = offsets_buf[pos];
deserializer.advance(to_skip);
return self.fields[pos].deserialize(&mut deserializer);
}
/// Returns a [`Vec<Value>`] that follows the primary key ordering.
fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<Value>>;
if offsets_buf.is_empty() {
let mut offset = 0;
// Skip values before `pos`.
for i in 0..pos {
// Offset to skip before reading value i.
offsets_buf.push(offset);
let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?;
offset += skip;
}
// Offset to skip before reading this value.
offsets_buf.push(offset);
} else {
// Offsets are not enough.
let value_start = offsets_buf.len() - 1;
// Advances to decode value at `value_start`.
let mut offset = offsets_buf[value_start];
deserializer.advance(offset);
for i in value_start..pos {
// Skip value i.
let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?;
// Offset for the value at i + 1.
offset += skip;
offsets_buf.push(offset);
}
}
self.fields[pos].deserialize(&mut deserializer)
}
}
impl RowCodec for McmpRowCodec {
fn encode<'a, I>(&self, row: I) -> Result<Vec<u8>>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut buffer = Vec::new();
self.encode_to_vec(row, &mut buffer)?;
Ok(buffer)
}
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
buffer.reserve(self.estimated_size());
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
field.serialize(&mut serializer, &value)?;
}
Ok(())
}
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>> {
let mut deserializer = Deserializer::new(bytes);
let mut values = Vec::with_capacity(self.fields.len());
for f in &self.fields {
let value = f.deserialize(&mut deserializer)?;
values.push(value);
}
Ok(values)
}
}
#[cfg(test)]
mod tests {
use common_base::bytes::StringBytes;
use common_time::{
DateTime, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use datatypes::value::Value;
use super::*;
fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec<Value>) {
let encoder = McmpRowCodec::new(
data_types
.iter()
.map(|t| SortField::new(t.clone()))
.collect::<Vec<_>>(),
);
let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
let result = encoder.encode(value_ref.iter().cloned()).unwrap();
let decoded = encoder.decode(&result).unwrap();
assert_eq!(decoded, row);
let mut decoded = Vec::new();
let mut offsets = Vec::new();
// Iter two times to test offsets buffer.
for _ in 0..2 {
decoded.clear();
for i in 0..data_types.len() {
let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
decoded.push(value);
}
assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
assert_eq!(decoded, row);
}
}
#[test]
fn test_memcmp() {
let encoder = McmpRowCodec::new(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int64_datatype()),
]);
let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
let result = encoder.encode(value_ref.iter().cloned()).unwrap();
let decoded = encoder.decode(&result).unwrap();
assert_eq!(&values, &decoded as &[Value]);
}
#[test]
fn test_memcmp_timestamp() {
check_encode_and_decode(
&[
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::int64_datatype(),
],
vec![
Value::Timestamp(Timestamp::new_millisecond(42)),
Value::Int64(43),
],
);
}
#[test]
fn test_memcmp_duration() {
check_encode_and_decode(
&[
ConcreteDataType::duration_millisecond_datatype(),
ConcreteDataType::int64_datatype(),
],
vec![
Value::Duration(Duration::new_millisecond(44)),
Value::Int64(45),
],
)
}
#[test]
fn test_memcmp_binary() {
check_encode_and_decode(
&[
ConcreteDataType::binary_datatype(),
ConcreteDataType::int64_datatype(),
],
vec![
Value::Binary(Bytes::from("hello".as_bytes())),
Value::Int64(43),
],
);
}
#[test]
fn test_memcmp_string() {
check_encode_and_decode(
&[ConcreteDataType::string_datatype()],
vec![Value::String(StringBytes::from("hello"))],
);
check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]);
check_encode_and_decode(
&[ConcreteDataType::string_datatype()],
vec![Value::String("".into())],
);
check_encode_and_decode(
&[ConcreteDataType::string_datatype()],
vec![Value::String("world".into())],
);
}
#[test]
fn test_encode_null() {
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
],
vec![Value::String(StringBytes::from("abcd")), Value::Null],
)
}
#[test]
fn test_encode_multiple_rows() {
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::boolean_datatype(),
],
vec![
Value::String("hello".into()),
Value::Int64(42),
Value::Boolean(false),
],
);
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::boolean_datatype(),
],
vec![
Value::String("world".into()),
Value::Int64(43),
Value::Boolean(true),
],
);
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::boolean_datatype(),
],
vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
);
// All types.
check_encode_and_decode(
&[
ConcreteDataType::boolean_datatype(),
ConcreteDataType::int8_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint64_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::binary_datatype(),
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::time_millisecond_datatype(),
ConcreteDataType::duration_millisecond_datatype(),
ConcreteDataType::interval_year_month_datatype(),
ConcreteDataType::interval_day_time_datatype(),
ConcreteDataType::interval_month_day_nano_datatype(),
ConcreteDataType::decimal128_default_datatype(),
ConcreteDataType::vector_datatype(3),
],
vec![
Value::Boolean(true),
Value::Int8(8),
Value::UInt8(8),
Value::Int16(16),
Value::UInt16(16),
Value::Int32(32),
Value::UInt32(32),
Value::Int64(64),
Value::UInt64(64),
Value::Float32(1.0.into()),
Value::Float64(1.0.into()),
Value::Binary(b"hello"[..].into()),
Value::String("world".into()),
Value::Date(Date::new(10)),
Value::DateTime(DateTime::new(11)),
Value::Timestamp(Timestamp::new_millisecond(12)),
Value::Time(Time::new_millisecond(13)),
Value::Duration(Duration::new_millisecond(14)),
Value::IntervalYearMonth(IntervalYearMonth::new(1)),
Value::IntervalDayTime(IntervalDayTime::new(1, 15)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
Value::Decimal128(Decimal128::from(16)),
Value::Binary(Bytes::from(vec![0; 12])),
],
);
}
/// Decode the leftmost value from bytes.
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>>;
}

View File

@@ -0,0 +1,679 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use bytes::Buf;
use common_base::bytes::Bytes;
use common_decimal::Decimal128;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::time::Time;
use common_time::{Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::Value;
use datatypes::types::IntervalType;
use datatypes::value::ValueRef;
use memcomparable::{Deserializer, Serializer};
use paste::paste;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use super::PrimaryKeyFilter;
use crate::error::{
self, FieldTypeMismatchSnafu, NotSupportedFieldSnafu, Result, SerializeFieldSnafu,
};
use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::DensePrimaryKeyFilter;
use crate::row_converter::{PrimaryKeyCodec, PrimaryKeyCodecExt};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SortField {
pub(crate) data_type: ConcreteDataType,
}
impl SortField {
pub fn new(data_type: ConcreteDataType) -> Self {
Self { data_type }
}
pub fn estimated_size(&self) -> usize {
match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
ConcreteDataType::Float32(_) => 5,
ConcreteDataType::Float64(_) => 9,
ConcreteDataType::Binary(_)
| ConcreteDataType::Json(_)
| ConcreteDataType::Vector(_) => 11,
ConcreteDataType::String(_) => 11, // a non-empty string takes at least 11 bytes.
ConcreteDataType::Date(_) => 5,
ConcreteDataType::DateTime(_) => 9,
ConcreteDataType::Timestamp(_) => 10,
ConcreteDataType::Time(_) => 10,
ConcreteDataType::Duration(_) => 10,
ConcreteDataType::Interval(_) => 18,
ConcreteDataType::Decimal128(_) => 19,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => 0,
}
}
}
impl SortField {
pub(crate) fn serialize(
&self,
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
) -> Result<()> {
macro_rules! cast_value_and_serialize {
(
$self: ident;
$serializer: ident;
$(
$ty: ident, $f: ident
),*
) => {
match &$self.data_type {
$(
ConcreteDataType::$ty(_) => {
paste!{
value
.[<as_ $f>]()
.context(FieldTypeMismatchSnafu)?
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
}
)*
ConcreteDataType::Timestamp(_) => {
let timestamp = value.as_timestamp().context(FieldTypeMismatchSnafu)?;
timestamp
.map(|t|t.value())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
let interval = value.as_interval_year_month().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i32())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
let interval = value.as_interval_day_time().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i64())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
let interval = value.as_interval_month_day_nano().context(FieldTypeMismatchSnafu)?;
interval.map(|i| i.to_i128())
.serialize($serializer)
.context(SerializeFieldSnafu)?;
}
ConcreteDataType::List(_) |
ConcreteDataType::Dictionary(_) |
ConcreteDataType::Null(_) => {
return error::NotSupportedFieldSnafu {
data_type: $self.data_type.clone()
}.fail()
}
}
};
}
cast_value_and_serialize!(self; serializer;
Boolean, boolean,
Binary, binary,
Int8, i8,
UInt8, u8,
Int16, i16,
UInt16, u16,
Int32, i32,
UInt32, u32,
Int64, i64,
UInt64, u64,
Float32, f32,
Float64, f64,
String, string,
Date, date,
DateTime, datetime,
Time, time,
Duration, duration,
Decimal128, decimal128,
Json, binary,
Vector, binary
);
Ok(())
}
fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
use common_time::DateTime;
macro_rules! deserialize_and_build_value {
(
$self: ident;
$serializer: ident;
$(
$ty: ident, $f: ident
),*
) => {
match &$self.data_type {
$(
ConcreteDataType::$ty(_) => {
Ok(Value::from(Option::<$f>::deserialize(deserializer).context(error::DeserializeFieldSnafu)?))
}
)*
ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) | ConcreteDataType::Vector(_) => Ok(Value::from(
Option::<Vec<u8>>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(Bytes::from),
)),
ConcreteDataType::Timestamp(ty) => {
let timestamp = Option::<i64>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(|t|ty.create_timestamp(t));
Ok(Value::from(timestamp))
}
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
let interval = Option::<i32>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalYearMonth::from_i32);
Ok(Value::from(interval))
}
ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
let interval = Option::<i64>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalDayTime::from_i64);
Ok(Value::from(interval))
}
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
let interval = Option::<i128>::deserialize(deserializer)
.context(error::DeserializeFieldSnafu)?
.map(IntervalMonthDayNano::from_i128);
Ok(Value::from(interval))
}
ConcreteDataType::List(l) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::List(l.clone()),
}
.fail(),
ConcreteDataType::Dictionary(d) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::Dictionary(d.clone()),
}
.fail(),
ConcreteDataType::Null(n) => NotSupportedFieldSnafu {
data_type: ConcreteDataType::Null(n.clone()),
}
.fail(),
}
};
}
deserialize_and_build_value!(self; deserializer;
Boolean, bool,
Int8, i8,
Int16, i16,
Int32, i32,
Int64, i64,
UInt8, u8,
UInt16, u16,
UInt32, u32,
UInt64, u64,
Float32, f32,
Float64, f64,
String, String,
Date, Date,
Time, Time,
DateTime, DateTime,
Duration, Duration,
Decimal128, Decimal128
)
}
/// Skip deserializing this field, returns the length of it.
fn skip_deserialize(
&self,
bytes: &[u8],
deserializer: &mut Deserializer<&[u8]>,
) -> Result<usize> {
let pos = deserializer.position();
if bytes[pos] == 0 {
deserializer.advance(1);
return Ok(1);
}
let to_skip = match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
ConcreteDataType::Float32(_) => 5,
ConcreteDataType::Float64(_) => 9,
ConcreteDataType::Binary(_)
| ConcreteDataType::Json(_)
| ConcreteDataType::Vector(_) => {
// Now the encoder encode binary as a list of bytes so we can't use
// skip bytes.
let pos_before = deserializer.position();
let mut current = pos_before + 1;
while bytes[current] == 1 {
current += 2;
}
let to_skip = current - pos_before + 1;
deserializer.advance(to_skip);
return Ok(to_skip);
}
ConcreteDataType::String(_) => {
let pos_before = deserializer.position();
deserializer.advance(1);
deserializer
.skip_bytes()
.context(error::DeserializeFieldSnafu)?;
return Ok(deserializer.position() - pos_before);
}
ConcreteDataType::Date(_) => 5,
ConcreteDataType::DateTime(_) => 9,
ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option<i64>
ConcreteDataType::Time(_) => 10, // i64 and 1 byte time unit
ConcreteDataType::Duration(_) => 10,
ConcreteDataType::Interval(IntervalType::YearMonth(_)) => 5,
ConcreteDataType::Interval(IntervalType::DayTime(_)) => 9,
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => 17,
ConcreteDataType::Decimal128(_) => 19,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => 0,
};
deserializer.advance(to_skip);
Ok(to_skip)
}
}
impl PrimaryKeyCodecExt for DensePrimaryKeyCodec {
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
self.encode_dense(row, buffer)
}
fn decode(&self, bytes: &[u8]) -> Result<Vec<Value>> {
self.decode_dense(bytes)
}
}
/// A memory-comparable row [`Value`] encoder/decoder.
#[derive(Clone, Debug)]
pub struct DensePrimaryKeyCodec {
/// Primary key fields.
ordered_primary_key_columns: Arc<Vec<SortField>>,
}
impl DensePrimaryKeyCodec {
pub fn new(metadata: &RegionMetadata) -> Self {
let ordered_primary_key_columns = Arc::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect::<Vec<_>>(),
);
Self {
ordered_primary_key_columns,
}
}
pub fn with_fields(fields: Vec<SortField>) -> Self {
Self {
ordered_primary_key_columns: Arc::new(fields),
}
}
fn encode_dense<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.ordered_primary_key_columns.iter()) {
field.serialize(&mut serializer, &value)?;
}
Ok(())
}
/// Decode value at `pos` in `bytes`.
///
/// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`.
pub fn decode_value_at(
&self,
bytes: &[u8],
pos: usize,
offsets_buf: &mut Vec<usize>,
) -> Result<Value> {
let mut deserializer = Deserializer::new(bytes);
if pos < offsets_buf.len() {
// We computed the offset before.
let to_skip = offsets_buf[pos];
deserializer.advance(to_skip);
return self.ordered_primary_key_columns[pos].deserialize(&mut deserializer);
}
if offsets_buf.is_empty() {
let mut offset = 0;
// Skip values before `pos`.
for i in 0..pos {
// Offset to skip before reading value i.
offsets_buf.push(offset);
let skip = self.ordered_primary_key_columns[i]
.skip_deserialize(bytes, &mut deserializer)?;
offset += skip;
}
// Offset to skip before reading this value.
offsets_buf.push(offset);
} else {
// Offsets are not enough.
let value_start = offsets_buf.len() - 1;
// Advances to decode value at `value_start`.
let mut offset = offsets_buf[value_start];
deserializer.advance(offset);
for i in value_start..pos {
// Skip value i.
let skip = self.ordered_primary_key_columns[i]
.skip_deserialize(bytes, &mut deserializer)?;
// Offset for the value at i + 1.
offset += skip;
offsets_buf.push(offset);
}
}
self.ordered_primary_key_columns[pos].deserialize(&mut deserializer)
}
pub fn estimated_size(&self) -> usize {
self.ordered_primary_key_columns
.iter()
.map(|f| f.estimated_size())
.sum()
}
}
impl PrimaryKeyCodec for DensePrimaryKeyCodec {
fn encode_key_value(&self, key_value: &KeyValue, buffer: &mut Vec<u8>) -> Result<()> {
self.encode_dense(key_value.primary_keys(), buffer)
}
fn encode_values(&self, values: &[Value], buffer: &mut Vec<u8>) -> Result<()> {
self.encode_dense(values.iter().map(|v| v.as_value_ref()), buffer)
}
fn estimated_size(&self) -> Option<usize> {
Some(self.estimated_size())
}
fn num_fields(&self) -> usize {
self.ordered_primary_key_columns.len()
}
fn encoding(&self) -> PrimaryKeyEncoding {
PrimaryKeyEncoding::Dense
}
fn primary_key_filter(
&self,
metadata: &RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
) -> Box<dyn PrimaryKeyFilter> {
Box::new(DensePrimaryKeyFilter::new(
metadata.clone(),
filters,
self.clone(),
))
}
fn decode_dense(&self, bytes: &[u8]) -> Result<Vec<Value>> {
let mut deserializer = Deserializer::new(bytes);
let mut values = Vec::with_capacity(self.ordered_primary_key_columns.len());
for f in self.ordered_primary_key_columns.iter() {
let value = f.deserialize(&mut deserializer)?;
values.push(value);
}
Ok(values)
}
fn decode_leftmost(&self, bytes: &[u8]) -> Result<Option<Value>> {
// TODO(weny, yinwen): avoid decoding the whole primary key.
let mut values = self.decode_dense(bytes)?;
Ok(values.pop())
}
}
#[cfg(test)]
mod tests {
use common_base::bytes::StringBytes;
use common_time::{
DateTime, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use datatypes::value::Value;
use super::*;
fn check_encode_and_decode(data_types: &[ConcreteDataType], row: Vec<Value>) {
let encoder = DensePrimaryKeyCodec::with_fields(
data_types
.iter()
.map(|t| SortField::new(t.clone()))
.collect::<Vec<_>>(),
);
let value_ref = row.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
let result = encoder.encode(value_ref.iter().cloned()).unwrap();
let decoded = encoder.decode(&result).unwrap();
assert_eq!(decoded, row);
let mut decoded = Vec::new();
let mut offsets = Vec::new();
// Iter two times to test offsets buffer.
for _ in 0..2 {
decoded.clear();
for i in 0..data_types.len() {
let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
decoded.push(value);
}
assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
assert_eq!(decoded, row);
}
}
#[test]
fn test_memcmp() {
let encoder = DensePrimaryKeyCodec::with_fields(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int64_datatype()),
]);
let values = [Value::String("abcdefgh".into()), Value::Int64(128)];
let value_ref = values.iter().map(|v| v.as_value_ref()).collect::<Vec<_>>();
let result = encoder.encode(value_ref.iter().cloned()).unwrap();
let decoded = encoder.decode(&result).unwrap();
assert_eq!(&values, &decoded as &[Value]);
}
#[test]
fn test_memcmp_timestamp() {
check_encode_and_decode(
&[
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::int64_datatype(),
],
vec![
Value::Timestamp(Timestamp::new_millisecond(42)),
Value::Int64(43),
],
);
}
#[test]
fn test_memcmp_duration() {
check_encode_and_decode(
&[
ConcreteDataType::duration_millisecond_datatype(),
ConcreteDataType::int64_datatype(),
],
vec![
Value::Duration(Duration::new_millisecond(44)),
Value::Int64(45),
],
)
}
#[test]
fn test_memcmp_binary() {
check_encode_and_decode(
&[
ConcreteDataType::binary_datatype(),
ConcreteDataType::int64_datatype(),
],
vec![
Value::Binary(Bytes::from("hello".as_bytes())),
Value::Int64(43),
],
);
}
#[test]
fn test_memcmp_string() {
check_encode_and_decode(
&[ConcreteDataType::string_datatype()],
vec![Value::String(StringBytes::from("hello"))],
);
check_encode_and_decode(&[ConcreteDataType::string_datatype()], vec![Value::Null]);
check_encode_and_decode(
&[ConcreteDataType::string_datatype()],
vec![Value::String("".into())],
);
check_encode_and_decode(
&[ConcreteDataType::string_datatype()],
vec![Value::String("world".into())],
);
}
#[test]
fn test_encode_null() {
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int32_datatype(),
],
vec![Value::String(StringBytes::from("abcd")), Value::Null],
)
}
#[test]
fn test_encode_multiple_rows() {
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::boolean_datatype(),
],
vec![
Value::String("hello".into()),
Value::Int64(42),
Value::Boolean(false),
],
);
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::boolean_datatype(),
],
vec![
Value::String("world".into()),
Value::Int64(43),
Value::Boolean(true),
],
);
check_encode_and_decode(
&[
ConcreteDataType::string_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::boolean_datatype(),
],
vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
);
// All types.
check_encode_and_decode(
&[
ConcreteDataType::boolean_datatype(),
ConcreteDataType::int8_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint64_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::binary_datatype(),
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::time_millisecond_datatype(),
ConcreteDataType::duration_millisecond_datatype(),
ConcreteDataType::interval_year_month_datatype(),
ConcreteDataType::interval_day_time_datatype(),
ConcreteDataType::interval_month_day_nano_datatype(),
ConcreteDataType::decimal128_default_datatype(),
ConcreteDataType::vector_datatype(3),
],
vec![
Value::Boolean(true),
Value::Int8(8),
Value::UInt8(8),
Value::Int16(16),
Value::UInt16(16),
Value::Int32(32),
Value::UInt32(32),
Value::Int64(64),
Value::UInt64(64),
Value::Float32(1.0.into()),
Value::Float64(1.0.into()),
Value::Binary(b"hello"[..].into()),
Value::String("world".into()),
Value::Date(Date::new(10)),
Value::DateTime(DateTime::new(11)),
Value::Timestamp(Timestamp::new_millisecond(12)),
Value::Time(Time::new_millisecond(13)),
Value::Duration(Duration::new_millisecond(14)),
Value::IntervalYearMonth(IntervalYearMonth::new(1)),
Value::IntervalDayTime(IntervalDayTime::new(1, 15)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 15)),
Value::Decimal128(Decimal128::from(16)),
Value::Binary(Bytes::from(vec![0; 12])),
],
);
}
}

View File

@@ -338,7 +338,7 @@ pub(crate) mod tests {
use super::*;
use crate::read::BatchColumn;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
pub fn mock_object_store() -> ObjectStore {
@@ -412,7 +412,7 @@ pub(crate) mod tests {
pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
let fields = vec![SortField::new(ConcreteDataType::string_datatype())];
let codec = McmpRowCodec::new(fields);
let codec = DensePrimaryKeyCodec::with_fields(fields);
let row: [ValueRef; 1] = [str_tag.as_ref().into()];
let primary_key = codec.encode(row.into_iter()).unwrap();

View File

@@ -20,7 +20,7 @@ use store_api::metadata::ColumnMetadata;
use store_api::storage::ColumnId;
use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField};
/// Encodes index values according to their data types for sorting and storage use.
pub struct IndexValueCodec;
@@ -66,7 +66,7 @@ pub struct IndexValuesCodec {
/// The data types of tag columns.
fields: Vec<SortField>,
/// The decoder for the primary key.
decoder: McmpRowCodec,
decoder: DensePrimaryKeyCodec,
}
impl IndexValuesCodec {
@@ -81,7 +81,7 @@ impl IndexValuesCodec {
})
.unzip();
let decoder = McmpRowCodec::new(fields.clone());
let decoder = DensePrimaryKeyCodec::with_fields(fields.clone());
Self {
column_ids,
fields,
@@ -94,7 +94,7 @@ impl IndexValuesCodec {
&self,
primary_key: &[u8],
) -> Result<impl Iterator<Item = (&(ColumnId, String), &SortField, Option<Value>)>> {
let values = self.decoder.decode(primary_key)?;
let values = self.decoder.decode_dense(primary_key)?;
let iter = values
.into_iter()
@@ -119,6 +119,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::row_converter::{PrimaryKeyCodecExt, SortField};
#[test]
fn test_encode_value_basic() {
@@ -165,7 +166,7 @@ mod tests {
},
];
let primary_key = McmpRowCodec::new(vec![
let primary_key = DensePrimaryKeyCodec::with_fields(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int64_datatype()),
])

View File

@@ -318,7 +318,7 @@ mod tests {
use crate::cache::index::inverted_index::InvertedIndexCache;
use crate::metrics::CACHE_BYTES;
use crate::read::BatchColumn;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
@@ -384,7 +384,7 @@ mod tests {
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::int32_datatype()),
];
let codec = McmpRowCodec::new(fields);
let codec = DensePrimaryKeyCodec::with_fields(fields);
let row: [ValueRef; 2] = [str_tag.as_ref().into(), i32_tag.into().into()];
let primary_key = codec.encode(row.into_iter()).unwrap();

View File

@@ -33,7 +33,7 @@ use crate::read::compat::CompatBatch;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::prune::PruneReader;
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext};
@@ -156,7 +156,7 @@ impl FileRangeContext {
reader_builder: RowGroupReaderBuilder,
filters: Vec<SimpleFilterContext>,
read_format: ReadFormat,
codec: McmpRowCodec,
codec: DensePrimaryKeyCodec,
) -> Self {
Self {
reader_builder,
@@ -241,7 +241,7 @@ pub(crate) struct RangeBase {
/// Helper to read the SST.
pub(crate) read_format: ReadFormat,
/// Decoder for primary keys
pub(crate) codec: McmpRowCodec,
pub(crate) codec: DensePrimaryKeyCodec,
/// Optional helper to compat batches.
pub(crate) compat_batch: Option<CompatBatch>,
}

View File

@@ -48,7 +48,7 @@ use crate::error::{
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SortField};
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::sst::to_sst_arrow_schema;
@@ -402,8 +402,9 @@ impl ReadFormat {
return None;
}
let converter =
McmpRowCodec::new(vec![SortField::new(column.column_schema.data_type.clone())]);
let converter = DensePrimaryKeyCodec::with_fields(vec![SortField::new(
column.column_schema.data_type.clone(),
)]);
let values = row_groups.iter().map(|meta| {
let stats = meta
.borrow()
@@ -421,8 +422,7 @@ impl ReadFormat {
Statistics::Double(_) => None,
Statistics::ByteArray(s) => {
let bytes = if is_min { s.min_bytes() } else { s.max_bytes() };
let mut values = converter.decode(bytes).ok()?;
values.pop()
converter.decode_leftmost(bytes).ok()?
}
Statistics::FixedLenByteArray(_) => None,
}

View File

@@ -49,7 +49,7 @@ use crate::metrics::{
};
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, SortField};
use crate::row_converter::DensePrimaryKeyCodec;
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
@@ -253,13 +253,7 @@ impl ParquetReaderBuilder {
vec![]
};
let codec = McmpRowCodec::new(
read_format
.metadata()
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
let codec = DensePrimaryKeyCodec::new(read_format.metadata());
let context = FileRangeContext::new(reader_builder, filters, read_format, codec);

View File

@@ -36,7 +36,7 @@ use crate::memtable::{
BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges,
MemtableRef, MemtableStats,
};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
/// Empty memtable for test.
#[derive(Debug, Default)]
@@ -313,12 +313,7 @@ pub(crate) fn encode_keys(
key_values: &KeyValues,
keys: &mut Vec<Vec<u8>>,
) {
let row_codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
let row_codec = DensePrimaryKeyCodec::new(metadata);
for kv in key_values.iter() {
let key = row_codec.encode(kv.primary_keys()).unwrap();
keys.push(key);
@@ -327,7 +322,7 @@ pub(crate) fn encode_keys(
/// Encode one key.
pub(crate) fn encode_key_by_kv(key_value: &KeyValue) -> Vec<u8> {
let row_codec = McmpRowCodec::new(vec![
let row_codec = DensePrimaryKeyCodec::with_fields(vec![
SortField::new(ConcreteDataType::string_datatype()),
SortField::new(ConcreteDataType::uint32_datatype()),
]);

View File

@@ -29,7 +29,7 @@ use store_api::metadata::{
use store_api::storage::RegionId;
use crate::read::{Batch, BatchBuilder, Source};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};
@@ -87,7 +87,7 @@ pub fn new_primary_key(tags: &[&str]) -> Vec<u8> {
let fields = (0..tags.len())
.map(|_| SortField::new(ConcreteDataType::string_datatype()))
.collect();
let converter = McmpRowCodec::new(fields);
let converter = DensePrimaryKeyCodec::with_fields(fields);
converter
.encode(tags.iter().map(|tag| ValueRef::String(tag)))
.unwrap()

View File

@@ -17,13 +17,12 @@
use std::sync::Arc;
use common_telemetry::info;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataBuilder;
use store_api::region_request::{AffectedRows, RegionCreateRequest};
use store_api::storage::RegionId;
use crate::error::{InvalidMetadataSnafu, Result};
use crate::error::Result;
use crate::region::opener::{check_recovered_region, RegionOpener};
use crate::worker::RegionWorkerLoop;
@@ -52,7 +51,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
builder.push_column_metadata(column);
}
builder.primary_key(request.primary_key);
let metadata = builder.build().context(InvalidMetadataSnafu)?;
// Create a MitoRegion from the RegionMetadata.
let region = RegionOpener::new(
region_id,
@@ -64,7 +63,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.intermediate_manager.clone(),
self.time_provider.clone(),
)
.metadata(metadata)
.metadata_builder(builder)
.parse_options(request.options)?
.cache(Some(self.cache_manager.clone()))
.create_or_open(&self.config, &self.wal)

View File

@@ -0,0 +1,26 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
/// Primary key encoding mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PrimaryKeyEncoding {
#[default]
/// Dense primary key encoding.
Dense,
/// Sparse primary key encoding.
Sparse,
}

View File

@@ -15,6 +15,7 @@
//! Storage related APIs
pub mod codec;
pub mod data_source;
pub mod logstore;
pub mod manifest;

View File

@@ -33,6 +33,7 @@ use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
use crate::codec::PrimaryKeyEncoding;
use crate::region_request::{AddColumn, AddColumnLocation, AlterKind, ModifyColumnType};
use crate::storage::consts::is_internal_column;
use crate::storage::{ColumnId, RegionId};
@@ -145,6 +146,9 @@ pub struct RegionMetadata {
///
/// The version starts from 0. Altering the schema bumps the version.
pub schema_version: u64,
/// Primary key encoding mode.
pub primary_key_encoding: PrimaryKeyEncoding,
}
impl fmt::Debug for RegionMetadata {
@@ -173,6 +177,8 @@ impl<'de> Deserialize<'de> for RegionMetadata {
primary_key: Vec<ColumnId>,
region_id: RegionId,
schema_version: u64,
#[serde(default)]
primary_key_encoding: PrimaryKeyEncoding,
}
let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
@@ -187,6 +193,7 @@ impl<'de> Deserialize<'de> for RegionMetadata {
primary_key: without_schema.primary_key,
region_id: without_schema.region_id,
schema_version: without_schema.schema_version,
primary_key_encoding: without_schema.primary_key_encoding,
})
}
}
@@ -320,6 +327,7 @@ impl RegionMetadata {
primary_key: projected_primary_key,
region_id: self.region_id,
schema_version: self.schema_version,
primary_key_encoding: self.primary_key_encoding,
})
}
@@ -504,6 +512,7 @@ pub struct RegionMetadataBuilder {
column_metadatas: Vec<ColumnMetadata>,
primary_key: Vec<ColumnId>,
schema_version: u64,
primary_key_encoding: PrimaryKeyEncoding,
}
impl RegionMetadataBuilder {
@@ -514,6 +523,7 @@ impl RegionMetadataBuilder {
column_metadatas: vec![],
primary_key: vec![],
schema_version: 0,
primary_key_encoding: PrimaryKeyEncoding::Dense,
}
}
@@ -524,9 +534,16 @@ impl RegionMetadataBuilder {
primary_key: existing.primary_key,
region_id: existing.region_id,
schema_version: existing.schema_version,
primary_key_encoding: existing.primary_key_encoding,
}
}
/// Sets the primary key encoding mode.
pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
self.primary_key_encoding = encoding;
self
}
/// Pushes a new column metadata to this region's metadata.
pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
self.column_metadatas.push(column_metadata);
@@ -582,6 +599,7 @@ impl RegionMetadataBuilder {
primary_key: self.primary_key,
region_id: self.region_id,
schema_version: self.schema_version,
primary_key_encoding: self.primary_key_encoding,
};
meta.validate()?;
@@ -1515,4 +1533,18 @@ mod test {
let formatted = format!("{:?}", region_metadata);
assert_eq!(formatted, "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0 }");
}
#[test]
fn test_region_metadata_deserialize_default_primary_key_encoding() {
let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
assert_eq!(
deserialized.primary_key_encoding,
PrimaryKeyEncoding::Sparse
);
}
}

View File

@@ -29,7 +29,7 @@ SELECT SUM(region_rows), SUM(disk_size), SUM(sst_size), SUM(index_size)
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| SUM(information_schema.region_statistics.region_rows) | SUM(information_schema.region_statistics.disk_size) | SUM(information_schema.region_statistics.sst_size) | SUM(information_schema.region_statistics.index_size) |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
| 3 | 2145 | 0 | 0 |
| 3 | 2238 | 0 | 0 |
+-------------------------------------------------------+-----------------------------------------------------+----------------------------------------------------+------------------------------------------------------+
SELECT data_length, index_length, avg_row_length, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'test';