From 7c88d721c2eeed04aea475b7bd93cd31d5178db6 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 21 Feb 2024 20:50:34 +0800 Subject: [PATCH] Merge pull request #3348 * feat: define functions for partitions * feat: write partitions * feat: fork and freeze partition * feat: create iter by partition * style: fix clippy * chore: typos * feat: add scan method to builder * feat: check whether the builder should freeze first --- src/mito2/src/memtable/merge_tree.rs | 3 +- .../src/memtable/merge_tree/partition.rs | 146 +++++++++++- src/mito2/src/memtable/merge_tree/shard.rs | 44 +++- .../src/memtable/merge_tree/shard_builder.rs | 44 ++++ src/mito2/src/memtable/merge_tree/tree.rs | 224 ++++++++++++++++-- src/mito2/src/memtable/time_series.rs | 4 +- 6 files changed, 446 insertions(+), 19 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 6e7c0329b4..8a0a6031a0 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -94,7 +94,8 @@ impl Memtable for MergeTreeMemtable { // TODO(yingwen): Validate schema while inserting rows. let mut metrics = WriteMetrics::default(); - let res = self.tree.write(kvs, &mut metrics); + let mut pk_buffer = Vec::new(); + let res = self.tree.write(kvs, &mut pk_buffer, &mut metrics); self.update_stats(&metrics); diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 0a5921c0ca..69c92ff69f 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -16,11 +16,20 @@ //! //! We only support partitioning the tree by pre-defined internal columns. +use std::collections::HashSet; use std::sync::{Arc, RwLock}; +use common_recordbatch::filter::SimpleFilterEvaluator; +use store_api::metadata::RegionMetadataRef; +use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; +use store_api::storage::ColumnId; + +use crate::error::Result; +use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::shard::Shard; use crate::memtable::merge_tree::shard_builder::ShardBuilder; -use crate::memtable::merge_tree::ShardId; +use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId}; /// Key of a partition. pub type PartitionKey = u32; @@ -30,13 +39,146 @@ pub struct Partition { inner: RwLock, } +impl Partition { + /// Creates a new partition. + pub fn new(_metadata: RegionMetadataRef, _config: &MergeTreeConfig) -> Self { + unimplemented!() + } + + /// Writes to the partition with a primary key. + pub fn write_with_key( + &self, + primary_key: &[u8], + key_value: KeyValue, + metrics: &mut WriteMetrics, + ) -> Result<()> { + let mut inner = self.inner.write().unwrap(); + // Now we ensure one key only exists in one shard. + if let Some(pk_id) = inner.find_key_in_shards(primary_key) { + // Key already in shards. + return inner.write_to_shard(pk_id, key_value); + } + + if inner.shard_builder.should_freeze() { + let shard_id = inner.active_shard_id; + let shard = inner.shard_builder.finish(shard_id)?; + inner.active_shard_id += 1; + inner.shards.push(shard); + } + + // Write to the shard builder. + inner + .shard_builder + .write_with_key(primary_key, key_value, metrics)?; + + Ok(()) + } + + /// Writes to the partition without a primary key. + pub fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> { + let mut inner = self.inner.write().unwrap(); + // If no primary key, always write to the first shard. + if inner.shards.is_empty() { + let shard_id = inner.active_shard_id; + inner.shards.push(Shard::new_no_dict(shard_id)); + inner.active_shard_id += 1; + } + + // A dummy pk id. + let pk_id = PkId { + shard_id: inner.active_shard_id - 1, + pk_index: 0, + }; + inner.shards[0].write_key_value(pk_id, key_value, metrics) + } + + /// Scans data in the partition. + pub fn scan( + &self, + _projection: HashSet, + _filters: Vec, + ) -> Result { + unimplemented!() + } + + /// Freezes the partition. + pub fn freeze(&self) -> Result<()> { + unimplemented!() + } + + /// Forks the partition. + pub fn fork(&self, _metadata: &RegionMetadataRef) -> Partition { + unimplemented!() + } + + /// Returns true if the partition has data. + pub fn has_data(&self) -> bool { + unimplemented!() + } + + /// Returns shared memory size of the partition. + pub fn shared_memory_size(&self) -> usize { + unimplemented!() + } + + /// Get partition key from the key value. + pub(crate) fn get_partition_key(key_value: &KeyValue, is_partitioned: bool) -> PartitionKey { + if !is_partitioned { + return PartitionKey::default(); + } + + let Some(value) = key_value.primary_keys().next() else { + return PartitionKey::default(); + }; + + value.as_u32().unwrap().unwrap() + } + + /// Returns true if the region can be partitioned. + pub(crate) fn has_multi_partitions(metadata: &RegionMetadataRef) -> bool { + metadata + .primary_key_columns() + .next() + .map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME) + .unwrap_or(false) + } + + /// Returns true if this is a partition column. + pub(crate) fn is_partition_column(name: &str) -> bool { + name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME + } +} + +/// Reader to scan rows in a partition. +/// +/// It can merge rows from multiple shards. +pub struct PartitionReader {} + pub type PartitionRef = Arc; /// Inner struct of the partition. +/// +/// A key only exists in one shard. struct Inner { /// Shard whose dictionary is active. shard_builder: ShardBuilder, - next_shard_id: ShardId, + active_shard_id: ShardId, /// Shards with frozon dictionary. shards: Vec, } + +impl Inner { + fn find_key_in_shards(&self, primary_key: &[u8]) -> Option { + for shard in &self.shards { + if let Some(pkid) = shard.find_key(primary_key) { + return Some(pkid); + } + } + + None + } + + fn write_to_shard(&mut self, _pk_id: PkId, _key_value: KeyValue) -> Result<()> { + unimplemented!() + } +} diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index d7fb74b6ba..9eceb49201 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -14,9 +14,17 @@ //! Shard in a partition. +use std::collections::HashSet; + +use common_recordbatch::filter::SimpleFilterEvaluator; +use store_api::storage::ColumnId; + +use crate::error::Result; +use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::data::DataParts; use crate::memtable::merge_tree::dict::KeyDictRef; -use crate::memtable::merge_tree::ShardId; +use crate::memtable::merge_tree::metrics::WriteMetrics; +use crate::memtable::merge_tree::{PkId, ShardId}; /// Shard stores data related to the same key dictionary. pub struct Shard { @@ -26,3 +34,37 @@ pub struct Shard { /// Data in the shard. data_parts: DataParts, } + +impl Shard { + /// Returns a shard without dictionary. + pub fn new_no_dict(_shard_id: ShardId) -> Shard { + unimplemented!() + } + + /// Returns the pk id of the key if it exists. + pub fn find_key(&self, _key: &[u8]) -> Option { + unimplemented!() + } + + /// Writes a key value into the shard. + pub fn write_key_value( + &mut self, + _pk_id: PkId, + _key_value: KeyValue, + _metrics: &mut WriteMetrics, + ) -> Result<()> { + unimplemented!() + } + + /// Scans the shard. + pub fn scan( + &self, + _projection: &HashSet, + _filters: &[SimpleFilterEvaluator], + ) -> ShardReader { + unimplemented!() + } +} + +/// Reader to read rows in a shard. +pub struct ShardReader {} diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index a663662049..c8d7802904 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -14,8 +14,13 @@ //! Builder of a shard. +use crate::error::Result; +use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::data::DataBuffer; use crate::memtable::merge_tree::dict::KeyDictBuilder; +use crate::memtable::merge_tree::metrics::WriteMetrics; +use crate::memtable::merge_tree::shard::Shard; +use crate::memtable::merge_tree::ShardId; /// Builder to write keys and data to a shard that the key dictionary /// is still active. @@ -24,4 +29,43 @@ pub struct ShardBuilder { dict_builder: KeyDictBuilder, /// Buffer to store data. data_buffer: DataBuffer, + /// Max keys in an index shard. + index_max_keys_per_shard: usize, + /// Number of rows to freeze a data part. + data_freeze_threshold: usize, } + +impl ShardBuilder { + /// Write a key value with its encoded primary key. + pub fn write_with_key( + &mut self, + _key: &[u8], + _key_value: KeyValue, + _metrics: &mut WriteMetrics, + ) -> Result<()> { + unimplemented!() + } + + /// Returns true if the builder is empty. + pub fn is_empty(&self) -> bool { + unimplemented!() + } + + /// Returns true if the builder need to freeze. + pub fn should_freeze(&self) -> bool { + unimplemented!() + } + + /// Builds a new shard and resets the builder. + pub fn finish(&mut self, _shard_id: ShardId) -> Result { + unimplemented!() + } + + /// Scans the shard builder + pub fn scan(&mut self, _shard_id: ShardId) -> Result { + unimplemented!() + } +} + +/// Reader to scan a shard. builder. +pub struct ShardBuilderReader {} diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 39b6fbea98..d9c26611f3 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -14,19 +14,31 @@ //! Implementation of the merge tree. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet, VecDeque}; use std::sync::{Arc, RwLock}; +use api::v1::OpType; +use common_recordbatch::filter::SimpleFilterEvaluator; +use common_time::Timestamp; +use datafusion_common::ScalarValue; +use datatypes::arrow; +use datatypes::data_type::ConcreteDataType; +use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::Result; +use crate::error::{PrimaryKeyLengthMismatchSnafu, Result}; +use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::metrics::WriteMetrics; -use crate::memtable::merge_tree::partition::{PartitionKey, PartitionRef}; +use crate::memtable::merge_tree::partition::{ + Partition, PartitionKey, PartitionReader, PartitionRef, +}; use crate::memtable::merge_tree::MergeTreeConfig; +use crate::memtable::time_series::primary_key_schema; use crate::memtable::{BoxedBatchIterator, KeyValues}; -use crate::row_converter::{McmpRowCodec, SortField}; +use crate::read::Batch; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// The merge tree. pub struct MergeTree { @@ -38,6 +50,8 @@ pub struct MergeTree { row_codec: Arc, /// Partitions in the tree. partitions: RwLock>, + /// Whether the tree has multiple partitions. + is_partitioned: bool, } impl MergeTree { @@ -49,12 +63,14 @@ impl MergeTree { .map(|c| SortField::new(c.column_schema.data_type.clone())) .collect(), ); + let is_partitioned = Partition::has_multi_partitions(&metadata); MergeTree { config: config.clone(), metadata, row_codec: Arc::new(row_codec), partitions: Default::default(), + is_partitioned, } } @@ -63,39 +79,219 @@ impl MergeTree { /// /// # Panics /// Panics if the tree is immutable (frozen). - pub fn write(&self, _kvs: &KeyValues, _metrics: &mut WriteMetrics) -> Result<()> { - todo!() + pub fn write( + &self, + kvs: &KeyValues, + pk_buffer: &mut Vec, + metrics: &mut WriteMetrics, + ) -> Result<()> { + let has_pk = !self.metadata.primary_key.is_empty(); + + for kv in kvs.iter() { + ensure!( + kv.num_primary_keys() == self.row_codec.num_fields(), + PrimaryKeyLengthMismatchSnafu { + expect: self.row_codec.num_fields(), + actual: kv.num_primary_keys(), + } + ); + // Safety: timestamp of kv must be both present and a valid timestamp value. + let ts = kv.timestamp().as_timestamp().unwrap().unwrap().value(); + metrics.min_ts = metrics.min_ts.min(ts); + metrics.max_ts = metrics.max_ts.max(ts); + metrics.value_bytes += kv.fields().map(|v| v.data_size()).sum::(); + + if !has_pk { + // No primary key. + self.write_no_key(kv, metrics)?; + continue; + } + + // Encode primary key. + pk_buffer.clear(); + self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?; + + // Write rows with primary keys. + self.write_with_key(pk_buffer, kv, metrics)?; + } + + metrics.value_bytes += + kvs.num_rows() * (std::mem::size_of::() + std::mem::size_of::()); + + Ok(()) } /// Scans the tree. pub fn scan( &self, - _projection: Option<&[ColumnId]>, - _predicate: Option, + projection: Option<&[ColumnId]>, + predicate: Option, ) -> Result { - todo!() + // Creates the projection set. + let projection: HashSet<_> = if let Some(projection) = projection { + projection.iter().copied().collect() + } else { + self.metadata.field_columns().map(|c| c.column_id).collect() + }; + + let filters = predicate + .map(|p| { + p.exprs() + .iter() + .filter_map(|f| SimpleFilterEvaluator::try_new(f.df_expr())) + .collect::>() + }) + .unwrap_or_default(); + + let partitions = self.prune_partitions(&filters); + let pk_schema = primary_key_schema(&self.metadata); + let pk_datatypes = self + .metadata + .primary_key_columns() + .map(|pk| pk.column_schema.data_type.clone()) + .collect(); + + let iter = TreeIter { + metadata: self.metadata.clone(), + pk_schema, + pk_datatypes, + projection, + filters, + row_codec: self.row_codec.clone(), + partitions, + current_reader: None, + }; + + Ok(Box::new(iter)) } /// Returns true if the tree is empty. + /// + /// A tree is empty if no partition has data. pub fn is_empty(&self) -> bool { - todo!() + let partitions = self.partitions.read().unwrap(); + partitions.values().all(|part| !part.has_data()) } /// Marks the tree as immutable. /// /// Once the tree becomes immutable, callers should not write to it again. pub fn freeze(&self) -> Result<()> { - todo!() + let partitions = self.partitions.read().unwrap(); + for partition in partitions.values() { + partition.freeze()?; + } + Ok(()) } /// Forks an immutable tree. Returns a mutable tree that inherits the index /// of this tree. - pub fn fork(&self, _metadata: RegionMetadataRef) -> MergeTree { - todo!() + pub fn fork(&self, metadata: RegionMetadataRef) -> MergeTree { + if self.metadata.schema_version != metadata.schema_version + || self.metadata.column_metadatas != metadata.column_metadatas + { + // The schema has changed, we can't reuse the tree. + return MergeTree::new(metadata, &self.config); + } + + let mut forked = BTreeMap::new(); + let partitions = self.partitions.read().unwrap(); + for (part_key, part) in partitions.iter() { + if !part.has_data() { + continue; + } + + // Only fork partitions that have data. + let forked_part = part.fork(&metadata); + forked.insert(*part_key, Arc::new(forked_part)); + } + + MergeTree { + config: self.config.clone(), + metadata, + row_codec: self.row_codec.clone(), + partitions: RwLock::new(forked), + is_partitioned: self.is_partitioned, + } } /// Returns the memory size shared by forked trees. pub fn shared_memory_size(&self) -> usize { - todo!() + let partitions = self.partitions.read().unwrap(); + partitions + .values() + .map(|part| part.shared_memory_size()) + .sum() + } + + fn write_with_key( + &self, + primary_key: &[u8], + key_value: KeyValue, + metrics: &mut WriteMetrics, + ) -> Result<()> { + let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned); + let partition = self.get_or_create_partition(partition_key); + + partition.write_with_key(primary_key, key_value, metrics) + } + + fn write_no_key(&self, key_value: KeyValue, metrics: &mut WriteMetrics) -> Result<()> { + let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned); + let partition = self.get_or_create_partition(partition_key); + + partition.write_no_key(key_value, metrics) + } + + fn get_or_create_partition(&self, partition_key: PartitionKey) -> PartitionRef { + let mut partitions = self.partitions.write().unwrap(); + partitions + .entry(partition_key) + .or_insert_with(|| Arc::new(Partition::new(self.metadata.clone(), &self.config))) + .clone() + } + + fn prune_partitions(&self, filters: &[SimpleFilterEvaluator]) -> VecDeque { + let partitions = self.partitions.read().unwrap(); + if self.is_partitioned { + // Prune partition keys. + for filter in filters { + // Only the first filter takes effect. + if Partition::is_partition_column(filter.column_name()) { + let mut pruned = VecDeque::new(); + for (key, partition) in partitions.iter() { + if filter + .evaluate_scalar(&ScalarValue::UInt32(Some(*key))) + .unwrap_or(true) + { + pruned.push_back(partition.clone()); + } + } + + return pruned; + } + } + } + + partitions.values().cloned().collect() + } +} + +struct TreeIter { + metadata: RegionMetadataRef, + pk_schema: arrow::datatypes::SchemaRef, + pk_datatypes: Vec, + projection: HashSet, + filters: Vec, + row_codec: Arc, + partitions: VecDeque, + current_reader: Option, +} + +impl Iterator for TreeIter { + type Item = Result; + + fn next(&mut self) -> Option { + unimplemented!() } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 3c86d5cd3b..f2bbe2030d 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -320,7 +320,9 @@ impl SeriesSet { /// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys /// of given region schema -fn primary_key_schema(region_metadata: &RegionMetadataRef) -> arrow::datatypes::SchemaRef { +pub(crate) fn primary_key_schema( + region_metadata: &RegionMetadataRef, +) -> arrow::datatypes::SchemaRef { let fields = region_metadata .primary_key_columns() .map(|pk| {