perf: Reduce decode overhead during pruning keys in the memtable (#3415)

* feat: reuse value buf

* feat: skip values to decode

* feat: prune shard

chore: fix compiler errors

refactor: shard prune metrics

* fix: panic on DedupReader::try_new

* fix: prune after next

* chore: num parts metrics

* feat: metrics and logs

* chore: data build cost

* chore: more logs

* feat: cache skip result

* chore: todo

* fix: index out of bound

* test: test codec

* fix: invalid offsets

* fix: skip binary

* fix: offset buffer reuse

* chore: comment

* test: test memtable filter

* style: fix clippy

* chore: fix compiler error
This commit is contained in:
Yingwen
2024-03-08 10:54:00 +08:00
committed by GitHub
parent 352bd7b6fd
commit 3ee53360ee
7 changed files with 494 additions and 138 deletions

View File

@@ -293,6 +293,8 @@ mod tests {
use std::collections::BTreeSet;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
@@ -528,4 +530,55 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(expect, read);
}
#[test]
fn test_memtable_filter() {
let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(
MergeTreeConfig {
index_max_keys_per_shard: 40,
..Default::default()
},
None,
)
.build(1, &metadata);
for i in 0..100 {
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), i, &timestamps, 1);
memtable.write(&kvs).unwrap();
}
for i in 0..100 {
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "k1".to_string(),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::UInt32(Some(i)))),
});
let iter = memtable
.iter(None, Some(Predicate::new(vec![expr.into()])))
.unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(timestamps, read);
}
}
}

View File

@@ -994,9 +994,11 @@ impl DataPartsReaderBuilder {
for p in self.parts {
nodes.push(DataNode::new(DataSource::Part(p)));
}
let num_parts = nodes.len();
let merger = Merger::try_new(nodes)?;
Ok(DataPartsReader {
merger,
num_parts,
elapsed: Default::default(),
})
}
@@ -1005,6 +1007,7 @@ impl DataPartsReaderBuilder {
/// Reader for all parts inside a `DataParts`.
pub struct DataPartsReader {
merger: Merger<DataNode>,
num_parts: usize,
elapsed: Duration,
}
@@ -1032,6 +1035,10 @@ impl DataPartsReader {
pub(crate) fn is_valid(&self) -> bool {
self.merger.is_valid()
}
pub(crate) fn num_parts(&self) -> usize {
self.num_parts
}
}
#[cfg(test)]

View File

@@ -45,7 +45,7 @@ impl<T: DataBatchSource> DataBatchSource for DedupReader<T> {
}
fn next(&mut self) -> Result<()> {
loop {
while self.inner.is_valid() {
match &mut self.prev_batch_last_row {
None => {
// First shot, fill prev_batch_last_row and current_batch_range with first batch.

View File

@@ -123,6 +123,15 @@ impl Partition {
/// Scans data in the partition.
pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
let key_filter = if context.need_prune_key {
Some(PrimaryKeyFilter::new(
context.metadata.clone(),
context.filters.clone(),
context.row_codec.clone(),
))
} else {
None
};
let (builder_source, shard_reader_builders) = {
let inner = self.inner.read().unwrap();
let mut shard_source = Vec::with_capacity(inner.shards.len() + 1);
@@ -141,14 +150,21 @@ impl Partition {
(builder_reader, shard_source)
};
context.metrics.num_shards = shard_reader_builders.len();
let mut nodes = shard_reader_builders
.into_iter()
.map(|builder| Ok(ShardNode::new(ShardSource::Shard(builder.build()?))))
.map(|builder| {
Ok(ShardNode::new(ShardSource::Shard(
builder.build(key_filter.clone())?,
)))
})
.collect::<Result<Vec<_>>>()?;
if let Some(builder) = builder_source {
context.metrics.read_builder = true;
// Move the initialization of ShardBuilderReader out of read lock.
let shard_builder_reader = builder.build(Some(&context.pk_weights))?;
let shard_builder_reader =
builder.build(Some(&context.pk_weights), key_filter.clone())?;
nodes.push(ShardNode::new(ShardSource::Builder(shard_builder_reader)));
}
@@ -266,11 +282,10 @@ pub(crate) struct PartitionStats {
#[derive(Default)]
struct PartitionReaderMetrics {
prune_pk: Duration,
read_source: Duration,
data_batch_to_batch: Duration,
keys_before_pruning: usize,
keys_after_pruning: usize,
read_builder: bool,
num_shards: usize,
}
/// Reader to scan rows in a partition.
@@ -279,18 +294,11 @@ struct PartitionReaderMetrics {
pub struct PartitionReader {
context: ReadPartitionContext,
source: BoxedDataBatchSource,
last_yield_pk_id: Option<PkId>,
}
impl PartitionReader {
fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
let mut reader = Self {
context,
source,
last_yield_pk_id: None,
};
// Find next valid batch.
reader.prune_batch_by_key()?;
let reader = Self { context, source };
Ok(reader)
}
@@ -305,8 +313,7 @@ impl PartitionReader {
/// # Panics
/// Panics if the reader is invalid.
pub fn next(&mut self) -> Result<()> {
self.advance_source()?;
self.prune_batch_by_key()
self.advance_source()
}
/// Converts current data batch into a [Batch].
@@ -336,106 +343,77 @@ impl PartitionReader {
self.context.metrics.read_source += read_source.elapsed();
Ok(())
}
fn prune_batch_by_key(&mut self) -> Result<()> {
if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key {
// Nothing to prune.
return Ok(());
}
while self.source.is_valid() {
let pk_id = self.source.current_pk_id();
if let Some(yield_pk_id) = self.last_yield_pk_id {
if pk_id == yield_pk_id {
// If this batch has the same key as last returned batch.
// We can return it without evaluating filters.
break;
}
}
let key = self.source.current_key().unwrap();
self.context.metrics.keys_before_pruning += 1;
// Prune batch by primary key.
if prune_primary_key(
&self.context.metadata,
&self.context.filters,
&self.context.row_codec,
key,
&mut self.context.metrics,
) {
// We need this key.
self.last_yield_pk_id = Some(pk_id);
self.context.metrics.keys_after_pruning += 1;
break;
}
self.advance_source()?;
}
Ok(())
}
}
fn prune_primary_key(
metadata: &RegionMetadataRef,
filters: &[SimpleFilterEvaluator],
codec: &McmpRowCodec,
pk: &[u8],
metrics: &mut PartitionReaderMetrics,
) -> bool {
let start = Instant::now();
let res = prune_primary_key_inner(metadata, filters, codec, pk);
metrics.prune_pk += start.elapsed();
res
#[derive(Clone)]
pub(crate) struct PrimaryKeyFilter {
metadata: RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
codec: Arc<McmpRowCodec>,
offsets_buf: Vec<usize>,
}
// TODO(yingwen): Improve performance of key pruning. Now we need to find index and
// then decode and convert each value.
/// Returns true if the `pk` is still needed.
fn prune_primary_key_inner(
metadata: &RegionMetadataRef,
filters: &[SimpleFilterEvaluator],
codec: &McmpRowCodec,
pk: &[u8],
) -> bool {
if filters.is_empty() {
return true;
impl PrimaryKeyFilter {
pub(crate) fn new(
metadata: RegionMetadataRef,
filters: Arc<Vec<SimpleFilterEvaluator>>,
codec: Arc<McmpRowCodec>,
) -> Self {
Self {
metadata,
filters,
codec,
offsets_buf: Vec::new(),
}
}
// no primary key, we simply return true.
if metadata.primary_key.is_empty() {
return true;
}
let pk_values = match codec.decode(pk) {
Ok(values) => values,
Err(e) => {
common_telemetry::error!(e; "Failed to decode primary key");
pub(crate) fn prune_primary_key(&mut self, pk: &[u8]) -> bool {
if self.filters.is_empty() {
return true;
}
};
// evaluate filters against primary key values
let mut result = true;
for filter in filters {
if Partition::is_partition_column(filter.column_name()) {
continue;
// no primary key, we simply return true.
if self.metadata.primary_key.is_empty() {
return true;
}
let Some(column) = metadata.column_by_name(filter.column_name()) else {
continue;
};
// ignore filters that are not referencing primary key columns
if column.semantic_type != SemanticType::Tag {
continue;
// evaluate filters against primary key values
let mut result = true;
self.offsets_buf.clear();
for filter in &*self.filters {
if Partition::is_partition_column(filter.column_name()) {
continue;
}
let Some(column) = self.metadata.column_by_name(filter.column_name()) else {
continue;
};
// ignore filters that are not referencing primary key columns
if column.semantic_type != SemanticType::Tag {
continue;
}
// index of the column in primary keys.
// Safety: A tag column is always in primary key.
let index = self.metadata.primary_key_index(column.column_id).unwrap();
let value = match self.codec.decode_value_at(pk, index, &mut self.offsets_buf) {
Ok(v) => v,
Err(e) => {
common_telemetry::error!(e; "Failed to decode primary key");
return true;
}
};
// TODO(yingwen): `evaluate_scalar()` creates temporary arrays to compare scalars. We
// can compare the bytes directly without allocation and matching types as we use
// comparable encoding.
// Safety: arrow schema and datatypes are constructed from the same source.
let scalar_value = value
.try_to_scalar_value(&column.column_schema.data_type)
.unwrap();
result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
}
// index of the column in primary keys.
// Safety: A tag column is always in primary key.
let index = metadata.primary_key_index(column.column_id).unwrap();
// Safety: arrow schema and datatypes are constructed from the same source.
let scalar_value = pk_values[index]
.try_to_scalar_value(&column.column_schema.data_type)
.unwrap();
result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
result
}
result
}
/// Structs to reuse across readers to avoid allocating for each reader.
@@ -443,7 +421,7 @@ pub(crate) struct ReadPartitionContext {
metadata: RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
filters: Arc<Vec<SimpleFilterEvaluator>>,
/// Buffer to store pk weights.
pk_weights: Vec<u16>,
need_prune_key: bool,
@@ -452,10 +430,6 @@ pub(crate) struct ReadPartitionContext {
impl Drop for ReadPartitionContext {
fn drop(&mut self) {
let partition_prune_pk = self.metrics.prune_pk.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_prune_pk"])
.observe(partition_prune_pk);
let partition_read_source = self.metrics.read_source.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_read_source"])
@@ -465,16 +439,13 @@ impl Drop for ReadPartitionContext {
.with_label_values(&["partition_data_batch_to_batch"])
.observe(partition_data_batch_to_batch);
if self.metrics.keys_before_pruning != 0 {
common_telemetry::debug!(
"TreeIter pruning, before: {}, after: {}, partition_read_source: {}s, partition_prune_pk: {}s, partition_data_batch_to_batch: {}s",
self.metrics.keys_before_pruning,
self.metrics.keys_after_pruning,
partition_read_source,
partition_prune_pk,
partition_data_batch_to_batch,
);
}
common_telemetry::debug!(
"TreeIter partitions metrics, read_builder: {}, num_shards: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s",
self.metrics.read_builder,
self.metrics.num_shards,
partition_read_source,
partition_data_batch_to_batch,
);
}
}
@@ -490,7 +461,7 @@ impl ReadPartitionContext {
metadata,
row_codec,
projection,
filters,
filters: Arc::new(filters),
pk_weights: Vec::new(),
need_prune_key,
metrics: Default::default(),

View File

@@ -15,6 +15,7 @@
//! Shard in a partition.
use std::cmp::Ordering;
use std::time::{Duration, Instant};
use store_api::metadata::RegionMetadataRef;
@@ -25,8 +26,10 @@ use crate::memtable::merge_tree::data::{
};
use crate::memtable::merge_tree::dict::KeyDictRef;
use crate::memtable::merge_tree::merger::{Merger, Node};
use crate::memtable::merge_tree::partition::PrimaryKeyFilter;
use crate::memtable::merge_tree::shard_builder::ShardBuilderReader;
use crate::memtable::merge_tree::{PkId, ShardId};
use crate::memtable::merge_tree::{PkId, PkIndex, ShardId};
use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED;
/// Shard stores data related to the same key dictionary.
pub struct Shard {
@@ -131,18 +134,15 @@ pub struct ShardReaderBuilder {
}
impl ShardReaderBuilder {
pub(crate) fn build(self) -> Result<ShardReader> {
pub(crate) fn build(self, key_filter: Option<PrimaryKeyFilter>) -> Result<ShardReader> {
let ShardReaderBuilder {
shard_id,
key_dict,
inner,
} = self;
let now = Instant::now();
let parts_reader = inner.build()?;
Ok(ShardReader {
shard_id,
key_dict,
parts_reader,
})
ShardReader::new(shard_id, key_dict, parts_reader, key_filter, now.elapsed())
}
}
@@ -151,15 +151,46 @@ pub struct ShardReader {
shard_id: ShardId,
key_dict: Option<KeyDictRef>,
parts_reader: DataPartsReader,
key_filter: Option<PrimaryKeyFilter>,
last_yield_pk_index: Option<PkIndex>,
keys_before_pruning: usize,
keys_after_pruning: usize,
prune_pk_cost: Duration,
data_build_cost: Duration,
}
impl ShardReader {
fn new(
shard_id: ShardId,
key_dict: Option<KeyDictRef>,
parts_reader: DataPartsReader,
key_filter: Option<PrimaryKeyFilter>,
data_build_cost: Duration,
) -> Result<Self> {
let has_pk = key_dict.is_some();
let mut reader = Self {
shard_id,
key_dict,
parts_reader,
key_filter: if has_pk { key_filter } else { None },
last_yield_pk_index: None,
keys_before_pruning: 0,
keys_after_pruning: 0,
prune_pk_cost: Duration::default(),
data_build_cost,
};
reader.prune_batch_by_key()?;
Ok(reader)
}
fn is_valid(&self) -> bool {
self.parts_reader.is_valid()
}
fn next(&mut self) -> Result<()> {
self.parts_reader.next()
self.parts_reader.next()?;
self.prune_batch_by_key()
}
fn current_key(&self) -> Option<&[u8]> {
@@ -180,6 +211,54 @@ impl ShardReader {
fn current_data_batch(&self) -> DataBatch {
self.parts_reader.current_data_batch()
}
fn prune_batch_by_key(&mut self) -> Result<()> {
let Some(key_filter) = &mut self.key_filter else {
return Ok(());
};
while self.parts_reader.is_valid() {
let pk_index = self.parts_reader.current_data_batch().pk_index();
if let Some(yield_pk_index) = self.last_yield_pk_index {
if pk_index == yield_pk_index {
break;
}
}
self.keys_before_pruning += 1;
// Safety: `key_filter` is some so the shard has primary keys.
let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index);
let now = Instant::now();
if key_filter.prune_primary_key(key) {
self.prune_pk_cost += now.elapsed();
self.last_yield_pk_index = Some(pk_index);
self.keys_after_pruning += 1;
break;
}
self.prune_pk_cost += now.elapsed();
self.parts_reader.next()?;
}
Ok(())
}
}
impl Drop for ShardReader {
fn drop(&mut self) {
let shard_prune_pk = self.prune_pk_cost.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["shard_prune_pk"])
.observe(shard_prune_pk);
if self.keys_before_pruning > 0 {
common_telemetry::debug!(
"ShardReader metrics, data parts: {}, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
self.parts_reader.num_parts(),
self.keys_before_pruning,
self.keys_after_pruning,
shard_prune_pk,
self.data_build_cost.as_secs_f64(),
);
}
}
}
/// A merger that merges batches from multiple shards.
@@ -422,7 +501,7 @@ mod tests {
}
assert!(!shard.is_empty());
let mut reader = shard.read().unwrap().build().unwrap();
let mut reader = shard.read().unwrap().build(None).unwrap();
let mut timestamps = Vec::new();
while reader.is_valid() {
let rb = reader.current_data_batch().slice_record_batch();

View File

@@ -16,6 +16,7 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::{Duration, Instant};
use store_api::metadata::RegionMetadataRef;
@@ -26,8 +27,9 @@ use crate::memtable::merge_tree::data::{
};
use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder};
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::partition::PrimaryKeyFilter;
use crate::memtable::merge_tree::shard::Shard;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId};
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex, ShardId};
use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED;
/// Builder to write keys and data to a shard that the key dictionary
@@ -176,13 +178,20 @@ pub(crate) struct ShardBuilderReaderBuilder {
}
impl ShardBuilderReaderBuilder {
pub(crate) fn build(self, pk_weights: Option<&[u16]>) -> Result<ShardBuilderReader> {
pub(crate) fn build(
self,
pk_weights: Option<&[u16]>,
key_filter: Option<PrimaryKeyFilter>,
) -> Result<ShardBuilderReader> {
let now = Instant::now();
let data_reader = self.data_reader.build(pk_weights)?;
Ok(ShardBuilderReader {
shard_id: self.shard_id,
dict_reader: self.dict_reader,
ShardBuilderReader::new(
self.shard_id,
self.dict_reader,
data_reader,
})
key_filter,
now.elapsed(),
)
}
}
@@ -191,15 +200,45 @@ pub struct ShardBuilderReader {
shard_id: ShardId,
dict_reader: DictBuilderReader,
data_reader: DataBufferReader,
key_filter: Option<PrimaryKeyFilter>,
last_yield_pk_index: Option<PkIndex>,
keys_before_pruning: usize,
keys_after_pruning: usize,
prune_pk_cost: Duration,
data_build_cost: Duration,
}
impl ShardBuilderReader {
fn new(
shard_id: ShardId,
dict_reader: DictBuilderReader,
data_reader: DataBufferReader,
key_filter: Option<PrimaryKeyFilter>,
data_build_cost: Duration,
) -> Result<Self> {
let mut reader = ShardBuilderReader {
shard_id,
dict_reader,
data_reader,
key_filter,
last_yield_pk_index: None,
keys_before_pruning: 0,
keys_after_pruning: 0,
prune_pk_cost: Duration::default(),
data_build_cost,
};
reader.prune_batch_by_key()?;
Ok(reader)
}
pub fn is_valid(&self) -> bool {
self.data_reader.is_valid()
}
pub fn next(&mut self) -> Result<()> {
self.data_reader.next()
self.data_reader.next()?;
self.prune_batch_by_key()
}
pub fn current_key(&self) -> Option<&[u8]> {
@@ -218,6 +257,52 @@ impl ShardBuilderReader {
pub fn current_data_batch(&self) -> DataBatch {
self.data_reader.current_data_batch()
}
fn prune_batch_by_key(&mut self) -> Result<()> {
let Some(key_filter) = &mut self.key_filter else {
return Ok(());
};
while self.data_reader.is_valid() {
let pk_index = self.data_reader.current_data_batch().pk_index();
if let Some(yield_pk_index) = self.last_yield_pk_index {
if pk_index == yield_pk_index {
break;
}
}
self.keys_before_pruning += 1;
let key = self.dict_reader.key_by_pk_index(pk_index);
let now = Instant::now();
if key_filter.prune_primary_key(key) {
self.prune_pk_cost += now.elapsed();
self.last_yield_pk_index = Some(pk_index);
self.keys_after_pruning += 1;
break;
}
self.prune_pk_cost += now.elapsed();
self.data_reader.next()?;
}
Ok(())
}
}
impl Drop for ShardBuilderReader {
fn drop(&mut self) {
let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["shard_builder_prune_pk"])
.observe(shard_builder_prune_pk);
if self.keys_before_pruning > 0 {
common_telemetry::debug!(
"ShardBuilderReader metrics, before pruning: {}, after pruning: {}, prune cost: {}s, build cost: {}s",
self.keys_before_pruning,
self.keys_after_pruning,
shard_builder_prune_pk,
self.data_build_cost.as_secs_f64(),
);
}
}
}
#[cfg(test)]
@@ -306,7 +391,7 @@ mod tests {
let mut reader = shard_builder
.read(&mut pk_weights)
.unwrap()
.build(Some(&pk_weights))
.build(Some(&pk_weights), None)
.unwrap();
let mut timestamps = Vec::new();
while reader.is_valid() {

View File

@@ -215,6 +215,61 @@ impl SortField {
Decimal128, Decimal128
)
}
/// Skip deserializing this field, returns the length of it.
fn skip_deserialize(
&self,
bytes: &[u8],
deserializer: &mut Deserializer<&[u8]>,
) -> Result<usize> {
let pos = deserializer.position();
if bytes[pos] == 0 {
deserializer.advance(1);
return Ok(1);
}
let to_skip = match &self.data_type {
ConcreteDataType::Boolean(_) => 2,
ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => 2,
ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => 3,
ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => 5,
ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => 9,
ConcreteDataType::Float32(_) => 5,
ConcreteDataType::Float64(_) => 9,
ConcreteDataType::Binary(_) => {
// Now the encoder encode binary as a list of bytes so we can't use
// skip bytes.
let pos_before = deserializer.position();
let mut current = pos_before + 1;
while bytes[current] == 1 {
current += 2;
}
let to_skip = current - pos_before + 1;
deserializer.advance(to_skip);
return Ok(to_skip);
}
ConcreteDataType::String(_) => {
let pos_before = deserializer.position();
deserializer.advance(1);
deserializer
.skip_bytes()
.context(error::DeserializeFieldSnafu)?;
return Ok(deserializer.position() - pos_before);
}
ConcreteDataType::Date(_) => 5,
ConcreteDataType::DateTime(_) => 9,
ConcreteDataType::Timestamp(_) => 9, // We treat timestamp as Option<i64>
ConcreteDataType::Time(_) => 10, // i64 and 1 byte time unit
ConcreteDataType::Duration(_) => 10,
ConcreteDataType::Interval(_) => 18,
ConcreteDataType::Decimal128(_) => 19,
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => 0,
};
deserializer.advance(to_skip);
Ok(to_skip)
}
}
/// A memory-comparable row [Value] encoder/decoder.
@@ -236,6 +291,52 @@ impl McmpRowCodec {
pub fn estimated_size(&self) -> usize {
self.fields.iter().map(|f| f.estimated_size()).sum()
}
/// Decode value at `pos` in `bytes`.
///
/// The i-th element in offsets buffer is how many bytes to skip in order to read value at `pos`.
pub fn decode_value_at(
&self,
bytes: &[u8],
pos: usize,
offsets_buf: &mut Vec<usize>,
) -> Result<Value> {
let mut deserializer = Deserializer::new(bytes);
if pos < offsets_buf.len() {
// We computed the offset before.
let to_skip = offsets_buf[pos];
deserializer.advance(to_skip);
return self.fields[pos].deserialize(&mut deserializer);
}
if offsets_buf.is_empty() {
let mut offset = 0;
// Skip values before `pos`.
for i in 0..pos {
// Offset to skip before reading value i.
offsets_buf.push(offset);
let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?;
offset += skip;
}
// Offset to skip before reading this value.
offsets_buf.push(offset);
} else {
// Offsets are not enough.
let value_start = offsets_buf.len() - 1;
// Advances to decode value at `value_start`.
let mut offset = offsets_buf[value_start];
deserializer.advance(offset);
for i in value_start..pos {
// Skip value i.
let skip = self.fields[i].skip_deserialize(bytes, &mut deserializer)?;
// Offset for the value at i + 1.
offset += skip;
offsets_buf.push(offset);
}
}
self.fields[pos].deserialize(&mut deserializer)
}
}
impl RowCodec for McmpRowCodec {
@@ -274,7 +375,7 @@ impl RowCodec for McmpRowCodec {
#[cfg(test)]
mod tests {
use common_base::bytes::StringBytes;
use common_time::Timestamp;
use common_time::{DateTime, Timestamp};
use datatypes::value::Value;
use super::*;
@@ -292,6 +393,18 @@ mod tests {
let result = encoder.encode(value_ref.iter().cloned()).unwrap();
let decoded = encoder.decode(&result).unwrap();
assert_eq!(decoded, row);
let mut decoded = Vec::new();
let mut offsets = Vec::new();
// Iter two times to test offsets buffer.
for _ in 0..2 {
decoded.clear();
for i in 0..data_types.len() {
let value = encoder.decode_value_at(&result, i, &mut offsets).unwrap();
decoded.push(value);
}
assert_eq!(data_types.len(), offsets.len(), "offsets: {:?}", offsets);
assert_eq!(decoded, row);
}
}
#[test]
@@ -416,5 +529,53 @@ mod tests {
],
vec![Value::Null, Value::Int64(43), Value::Boolean(true)],
);
// All types.
check_encode_and_decode(
&[
ConcreteDataType::boolean_datatype(),
ConcreteDataType::int8_datatype(),
ConcreteDataType::uint8_datatype(),
ConcreteDataType::int16_datatype(),
ConcreteDataType::uint16_datatype(),
ConcreteDataType::int32_datatype(),
ConcreteDataType::uint32_datatype(),
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint64_datatype(),
ConcreteDataType::float32_datatype(),
ConcreteDataType::float64_datatype(),
ConcreteDataType::binary_datatype(),
ConcreteDataType::string_datatype(),
ConcreteDataType::date_datatype(),
ConcreteDataType::datetime_datatype(),
ConcreteDataType::timestamp_millisecond_datatype(),
ConcreteDataType::time_millisecond_datatype(),
ConcreteDataType::duration_millisecond_datatype(),
ConcreteDataType::interval_month_day_nano_datatype(),
ConcreteDataType::decimal128_default_datatype(),
],
vec![
Value::Boolean(true),
Value::Int8(8),
Value::UInt8(8),
Value::Int16(16),
Value::UInt16(16),
Value::Int32(32),
Value::UInt32(32),
Value::Int64(64),
Value::UInt64(64),
Value::Float32(1.0.into()),
Value::Float64(1.0.into()),
Value::Binary(b"hello"[..].into()),
Value::String("world".into()),
Value::Date(Date::new(10)),
Value::DateTime(DateTime::new(11)),
Value::Timestamp(Timestamp::new_millisecond(12)),
Value::Time(Time::new_millisecond(13)),
Value::Duration(Duration::new_millisecond(14)),
Value::Interval(Interval::from_month_day_nano(1, 1, 15)),
Value::Decimal128(Decimal128::from(16)),
],
);
}
}