Merge pull request #3348

* feat: define functions for partitions

* feat: write partitions

* feat: fork and freeze partition

* feat: create iter by partition

* style: fix clippy

* chore: typos

* feat: add scan method to builder

* feat: check whether the builder should freeze first
This commit is contained in:
Yingwen
2024-02-21 20:50:34 +08:00
committed by GitHub
parent 90169c868d
commit 7c88d721c2
6 changed files with 446 additions and 19 deletions

View File

@@ -94,7 +94,8 @@ impl Memtable for MergeTreeMemtable {
// TODO(yingwen): Validate schema while inserting rows.
let mut metrics = WriteMetrics::default();
let res = self.tree.write(kvs, &mut metrics);
let mut pk_buffer = Vec::new();
let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics);
self.update_stats(&metrics);

View File

@@ -16,11 +16,20 @@
//!
//! We only support partitioning the tree by pre-defined internal columns.
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use common_recordbatch::filter::SimpleFilterEvaluator;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::shard::Shard;
use crate::memtable::merge_tree::shard_builder::ShardBuilder;
use crate::memtable::merge_tree::ShardId;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId};
/// Key of a partition.
pub type PartitionKey = u32;
@@ -30,13 +39,146 @@ pub struct Partition {
inner: RwLock<Inner>,
}
impl Partition {
/// Creates a new partition.
pub fn new(_metadata: RegionMetadataRef, _config: &MergeTreeConfig) -> Self {
unimplemented!()
}
/// Writes to the partition with a primary key.
pub fn write_with_key(
&self,
primary_key: &[u8],
key_value: KeyValue,
metrics: &mut WriteMetrics,
) -> Result<()> {
let mut inner = self.inner.write().unwrap();
// Now we ensure one key only exists in one shard.
if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
// Key already in shards.
return inner.write_to_shard(pk_id, key_value);
}
if inner.shard_builder.should_freeze() {
let shard_id = inner.active_shard_id;
let shard = inner.shard_builder.finish(shard_id)?;
inner.active_shard_id += 1;
inner.shards.push(shard);
}
// Write to the shard builder.
inner
.shard_builder
.write_with_key(primary_key, key_value, metrics)?;
Ok(())
}
/// Writes to the partition without a primary key.
pub fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> {
let mut inner = self.inner.write().unwrap();
// If no primary key, always write to the first shard.
if inner.shards.is_empty() {
let shard_id = inner.active_shard_id;
inner.shards.push(Shard::new_no_dict(shard_id));
inner.active_shard_id += 1;
}
// A dummy pk id.
let pk_id = PkId {
shard_id: inner.active_shard_id - 1,
pk_index: 0,
};
inner.shards[0].write_key_value(pk_id, key_value, metrics)
}
/// Scans data in the partition.
pub fn scan(
&self,
_projection: HashSet<ColumnId>,
_filters: Vec<SimpleFilterEvaluator>,
) -> Result<PartitionReader> {
unimplemented!()
}
/// Freezes the partition.
pub fn freeze(&self) -> Result<()> {
unimplemented!()
}
/// Forks the partition.
pub fn fork(&self, _metadata: &RegionMetadataRef) -> Partition {
unimplemented!()
}
/// Returns true if the partition has data.
pub fn has_data(&self) -> bool {
unimplemented!()
}
/// Returns shared memory size of the partition.
pub fn shared_memory_size(&self) -> usize {
unimplemented!()
}
/// Get partition key from the key value.
pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey {
if !is_partitioned {
return PartitionKey::default();
}
let Some(value) = key_value.primary_keys().next() else {
return PartitionKey::default();
};
value.as_u32().unwrap().unwrap()
}
/// Returns true if the region can be partitioned.
pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool {
metadata
.primary_key_columns()
.next()
.map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
.unwrap_or(false)
}
/// Returns true if this is a partition column.
pub(crate) fn is_partition_column(name: &str) -> bool {
name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
}
}
/// Reader to scan rows in a partition.
///
/// It can merge rows from multiple shards.
pub struct PartitionReader {}
pub type PartitionRef = Arc<Partition>;
/// Inner struct of the partition.
///
/// A key only exists in one shard.
struct Inner {
/// Shard whose dictionary is active.
shard_builder: ShardBuilder,
next_shard_id: ShardId,
active_shard_id: ShardId,
/// Shards with frozon dictionary.
shards: Vec<Shard>,
}
impl Inner {
fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
for shard in &self.shards {
if let Some(pkid) = shard.find_key(primary_key) {
return Some(pkid);
}
}
None
}
fn write_to_shard(&mut self, _pk_id: PkId, _key_value: KeyValue) -> Result<()> {
unimplemented!()
}
}

View File

@@ -14,9 +14,17 @@
//! Shard in a partition.
use std::collections::HashSet;
use common_recordbatch::filter::SimpleFilterEvaluator;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::DataParts;
use crate::memtable::merge_tree::dict::KeyDictRef;
use crate::memtable::merge_tree::ShardId;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::{PkId, ShardId};
/// Shard stores data related to the same key dictionary.
pub struct Shard {
@@ -26,3 +34,37 @@ pub struct Shard {
/// Data in the shard.
data_parts: DataParts,
}
impl Shard {
/// Returns a shard without dictionary.
pub fn new_no_dict(_shard_id: ShardId) -> Shard {
unimplemented!()
}
/// Returns the pk id of the key if it exists.
pub fn find_key(&self, _key: &[u8]) -> Option<PkId> {
unimplemented!()
}
/// Writes a key value into the shard.
pub fn write_key_value(
&mut self,
_pk_id: PkId,
_key_value: KeyValue,
_metrics: &mut WriteMetrics,
) -> Result<()> {
unimplemented!()
}
/// Scans the shard.
pub fn scan(
&self,
_projection: &HashSet<ColumnId>,
_filters: &[SimpleFilterEvaluator],
) -> ShardReader {
unimplemented!()
}
}
/// Reader to read rows in a shard.
pub struct ShardReader {}

View File

@@ -14,8 +14,13 @@
//! Builder of a shard.
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::DataBuffer;
use crate::memtable::merge_tree::dict::KeyDictBuilder;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::shard::Shard;
use crate::memtable::merge_tree::ShardId;
/// Builder to write keys and data to a shard that the key dictionary
/// is still active.
@@ -24,4 +29,43 @@ pub struct ShardBuilder {
dict_builder: KeyDictBuilder,
/// Buffer to store data.
data_buffer: DataBuffer,
/// Max keys in an index shard.
index_max_keys_per_shard: usize,
/// Number of rows to freeze a data part.
data_freeze_threshold: usize,
}
impl ShardBuilder {
/// Write a key value with its encoded primary key.
pub fn write_with_key(
&mut self,
_key: &[u8],
_key_value: KeyValue,
_metrics: &mut WriteMetrics,
) -> Result<()> {
unimplemented!()
}
/// Returns true if the builder is empty.
pub fn is_empty(&self) -> bool {
unimplemented!()
}
/// Returns true if the builder need to freeze.
pub fn should_freeze(&self) -> bool {
unimplemented!()
}
/// Builds a new shard and resets the builder.
pub fn finish(&mut self, _shard_id: ShardId) -> Result<Shard> {
unimplemented!()
}
/// Scans the shard builder
pub fn scan(&mut self, _shard_id: ShardId) -> Result<ShardBuilderReader> {
unimplemented!()
}
}
/// Reader to scan a shard. builder.
pub struct ShardBuilderReader {}

View File

@@ -14,19 +14,31 @@
//! Implementation of the merge tree.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::sync::{Arc, RwLock};
use api::v1::OpType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datatypes::arrow;
use datatypes::data_type::ConcreteDataType;
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result};
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::partition::{PartitionKey, PartitionRef};
use crate::memtable::merge_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef,
};
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::time_series::primary_key_schema;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::row_converter::{McmpRowCodec, SortField};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// The merge tree.
pub struct MergeTree {
@@ -38,6 +50,8 @@ pub struct MergeTree {
row_codec: Arc<McmpRowCodec>,
/// Partitions in the tree.
partitions: RwLock<BTreeMap<PartitionKey, PartitionRef>>,
/// Whether the tree has multiple partitions.
is_partitioned: bool,
}
impl MergeTree {
@@ -49,12 +63,14 @@ impl MergeTree {
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
let is_partitioned = Partition::has_multi_partitions(&metadata);
MergeTree {
config: config.clone(),
metadata,
row_codec: Arc::new(row_codec),
partitions: Default::default(),
is_partitioned,
}
}
@@ -63,39 +79,219 @@ impl MergeTree {
///
/// # Panics
/// Panics if the tree is immutable (frozen).
pub fn write(&self, _kvs: &KeyValues, _metrics: &mut WriteMetrics) -> Result<()> {
todo!()
pub fn write(
&self,
kvs: &KeyValues,
pk_buffer: &mut Vec<u8>,
metrics: &mut WriteMetrics,
) -> Result<()> {
let has_pk = !self.metadata.primary_key.is_empty();
for kv in kvs.iter() {
ensure!(
kv.num_primary_keys() == self.row_codec.num_fields(),
PrimaryKeyLengthMismatchSnafu {
expect: self.row_codec.num_fields(),
actual: kv.num_primary_keys(),
}
);
// Safety: timestamp of kv must be both present and a valid timestamp value.
let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value();
metrics.min_ts = metrics.min_ts.min(ts);
metrics.max_ts = metrics.max_ts.max(ts);
metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::<usize>();
if !has_pk {
// No primary key.
self.write_no_key(kv, metrics)?;
continue;
}
// Encode primary key.
pk_buffer.clear();
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
// Write rows with primary keys.
self.write_with_key(pk_buffer, kv, metrics)?;
}
metrics.value_bytes +=
kvs.num_rows() * (std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>());
Ok(())
}
/// Scans the tree.
pub fn scan(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
todo!()
// Creates the projection set.
let projection: HashSet<_> = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
self.metadata.field_columns().map(|c| c.column_id).collect()
};
let filters = predicate
.map(|p| {
p.exprs()
.iter()
.filter_map(|f| SimpleFilterEvaluator::try_new(f.df_expr()))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let partitions = self.prune_partitions(&filters);
let pk_schema = primary_key_schema(&self.metadata);
let pk_datatypes = self
.metadata
.primary_key_columns()
.map(|pk| pk.column_schema.data_type.clone())
.collect();
let iter = TreeIter {
metadata: self.metadata.clone(),
pk_schema,
pk_datatypes,
projection,
filters,
row_codec: self.row_codec.clone(),
partitions,
current_reader: None,
};
Ok(Box::new(iter))
}
/// Returns true if the tree is empty.
///
/// A tree is empty if no partition has data.
pub fn is_empty(&self) -> bool {
todo!()
let partitions = self.partitions.read().unwrap();
partitions.values().all(|part| !part.has_data())
}
/// Marks the tree as immutable.
///
/// Once the tree becomes immutable, callers should not write to it again.
pub fn freeze(&self) -> Result<()> {
todo!()
let partitions = self.partitions.read().unwrap();
for partition in partitions.values() {
partition.freeze()?;
}
Ok(())
}
/// Forks an immutable tree. Returns a mutable tree that inherits the index
/// of this tree.
pub fn fork(&self, _metadata: RegionMetadataRef) -> MergeTree {
todo!()
pub fn fork(&self, metadata: RegionMetadataRef) -> MergeTree {
if self.metadata.schema_version != metadata.schema_version
|| self.metadata.column_metadatas != metadata.column_metadatas
{
// The schema has changed, we can't reuse the tree.
return MergeTree::new(metadata, &self.config);
}
let mut forked = BTreeMap::new();
let partitions = self.partitions.read().unwrap();
for (part_key, part) in partitions.iter() {
if !part.has_data() {
continue;
}
// Only fork partitions that have data.
let forked_part = part.fork(&metadata);
forked.insert(*part_key, Arc::new(forked_part));
}
MergeTree {
config: self.config.clone(),
metadata,
row_codec: self.row_codec.clone(),
partitions: RwLock::new(forked),
is_partitioned: self.is_partitioned,
}
}
/// Returns the memory size shared by forked trees.
pub fn shared_memory_size(&self) -> usize {
todo!()
let partitions = self.partitions.read().unwrap();
partitions
.values()
.map(|part| part.shared_memory_size())
.sum()
}
fn write_with_key(
&self,
primary_key: &[u8],
key_value: KeyValue,
metrics: &mut WriteMetrics,
) -> Result<()> {
let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
let partition = self.get_or_create_partition(partition_key);
partition.write_with_key(primary_key, key_value, metrics)
}
fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> {
let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
let partition = self.get_or_create_partition(partition_key);
partition.write_no_key(key_value, metrics)
}
fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef {
let mut partitions = self.partitions.write().unwrap();
partitions
.entry(partition_key)
.or_insert_with(|| Arc::new(Partition::new(self.metadata.clone(), &self.config)))
.clone()
}
fn prune_partitions(&self, filters: &[SimpleFilterEvaluator]) -> VecDeque<PartitionRef> {
let partitions = self.partitions.read().unwrap();
if self.is_partitioned {
// Prune partition keys.
for filter in filters {
// Only the first filter takes effect.
if Partition::is_partition_column(filter.column_name()) {
let mut pruned = VecDeque::new();
for (key, partition) in partitions.iter() {
if filter
.evaluate_scalar(&ScalarValue::UInt32(Some(*key)))
.unwrap_or(true)
{
pruned.push_back(partition.clone());
}
}
return pruned;
}
}
}
partitions.values().cloned().collect()
}
}
struct TreeIter {
metadata: RegionMetadataRef,
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
row_codec: Arc<McmpRowCodec>,
partitions: VecDeque<PartitionRef>,
current_reader: Option<PartitionReader>,
}
impl Iterator for TreeIter {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
unimplemented!()
}
}

View File

@@ -320,7 +320,9 @@ impl SeriesSet {
/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
/// of given region schema
fn primary_key_schema(region_metadata: &RegionMetadataRef) -> arrow::datatypes::SchemaRef {
pub(crate) fn primary_key_schema(
region_metadata: &RegionMetadataRef,
) -> arrow::datatypes::SchemaRef {
let fields = region_metadata
.primary_key_columns()
.map(|pk| {