feat: Implement iter for the new memtable (#3373)

* chore: read shard builder

* chore: reuse pk weights

* chore: prune key

* chore: shard reader wip

* refactor: shard builder DataBatch

* feat: merge shard readers

* feat: return shard id in shard readers

* feat: impl partition reader

* chore: impl partition read

* feat: impl iter tree

* chore: save last yield pk id

* style: fix clippy

* refactor: rename ShardReaderImpl to ShardReader

* chore: address CR comment
This commit is contained in:
Yingwen
2024-02-25 15:42:16 +08:00
committed by GitHub
parent afe4633320
commit 8059b95e37
6 changed files with 666 additions and 89 deletions

View File

@@ -110,10 +110,10 @@ impl Memtable for MergeTreeMemtable {
fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
todo!()
self.tree.read(projection, predicate)
}
fn is_empty(&self) -> bool {
@@ -275,18 +275,22 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use common_time::Timestamp;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use super::*;
use crate::test_util::memtable_util;
#[test]
fn test_memtable_sorted_input() {
write_sorted_input(true);
write_sorted_input(false);
write_iter_sorted_input(true);
write_iter_sorted_input(false);
}
fn write_sorted_input(has_pk: bool) {
fn write_iter_sorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
@@ -298,7 +302,27 @@ mod tests {
let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default());
memtable.write(&kvs).unwrap();
// TODO(yingwen): Test iter.
let expected_ts = kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<BTreeSet<_>>();
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<BTreeSet<_>>();
assert_eq!(expected_ts, read);
let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
@@ -344,7 +368,36 @@ mod tests {
);
memtable.write(&kvs).unwrap();
// TODO(yingwen): Test iter.
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.sequences()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap())
.collect::<Vec<_>>();
assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);
let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
@@ -353,4 +406,41 @@ mod tests {
stats.time_range()
);
}
#[test]
fn test_memtable_projection() {
write_iter_projection(true);
write_iter_projection(false);
}
fn write_iter_projection(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata);
let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), None).unwrap();
let mut v0_all = vec![];
for res in iter {
let batch = res.unwrap();
assert_eq!(1, batch.fields().len());
let v0 = batch
.fields()
.first()
.unwrap()
.data
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
}
assert_eq!(expect, v0_all);
}
}

View File

@@ -188,8 +188,8 @@ impl DictBuilderReader {
}
/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.sorted_pk_indices)
pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec<u16>) {
compute_pk_weights(&self.sorted_pk_indices, pk_weights)
}
/// Returns pk indices sorted by keys.
@@ -199,12 +199,11 @@ impl DictBuilderReader {
}
/// Returns pk weights to sort a data part and replaces pk indices.
fn compute_pk_weights(sorted_pk_indices: &[PkIndex]) -> Vec<u16> {
let mut pk_weights = vec![0; sorted_pk_indices.len()];
fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>) {
pk_weights.resize(sorted_pk_indices.len(), 0);
for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
pk_weights[*pk_index as usize] = weight as u16;
}
pk_weights
}
/// A key dictionary.
@@ -240,7 +239,9 @@ impl KeyDict {
/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.key_positions)
let mut pk_weights = Vec::with_capacity(self.key_positions.len());
compute_pk_weights(&self.key_positions, &mut pk_weights);
pk_weights
}
/// Returns the shared memory size.

View File

@@ -19,6 +19,7 @@
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
@@ -26,11 +27,13 @@ use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{DataParts, DATA_INIT_CAP};
use crate::memtable::merge_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::shard::Shard;
use crate::memtable::merge_tree::shard::{Shard, ShardMerger, ShardNode, ShardSource};
use crate::memtable::merge_tree::shard_builder::ShardBuilder;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId};
use crate::memtable::merge_tree::{MergeTreeConfig, PkId};
use crate::read::{Batch, BatchBuilder};
use crate::row_converter::{McmpRowCodec, RowCodec};
/// Key of a partition.
pub type PartitionKey = u32;
@@ -40,13 +43,13 @@ pub struct Partition {
inner: RwLock<Inner>,
}
pub type PartitionRef = Arc<Partition>;
impl Partition {
/// Creates a new partition.
pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self {
let shard_builder = ShardBuilder::new(metadata.clone(), config);
Partition {
inner: RwLock::new(Inner::new(metadata, shard_builder, config.dedup)),
inner: RwLock::new(Inner::new(metadata, config)),
}
}
@@ -83,7 +86,7 @@ impl Partition {
let mut inner = self.inner.write().unwrap();
// If no primary key, always write to the first shard.
debug_assert!(!inner.shards.is_empty());
debug_assert_eq!(1, inner.active_shard_id);
debug_assert_eq!(1, inner.shard_builder.current_shard_id());
// A dummy pk id.
let pk_id = PkId {
@@ -95,12 +98,31 @@ impl Partition {
}
/// Scans data in the partition.
pub fn scan(
&self,
_projection: HashSet<ColumnId>,
_filters: Vec<SimpleFilterEvaluator>,
) -> Result<PartitionReader> {
unimplemented!()
pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
// TODO(yingwen): Change to acquire read lock if `read()` takes `&self`.
let nodes = {
let mut inner = self.inner.write().unwrap();
let mut nodes = Vec::with_capacity(inner.shards.len() + 1);
let bulder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
nodes.push(ShardNode::new(ShardSource::Builder(bulder_reader)));
for shard in &mut inner.shards {
let shard_reader = shard.read()?;
nodes.push(ShardNode::new(ShardSource::Shard(shard_reader)));
}
nodes
};
// Creating a shard merger will invoke next so we do it outside of the lock.
let shard_merger = ShardMerger::try_new(nodes)?;
Ok(PartitionReader {
metadata: context.metadata,
row_codec: context.row_codec,
projection: context.projection,
filters: context.filters,
pk_weights: context.pk_weights,
shard_merger,
last_yield_pk_id: None,
})
}
/// Freezes the partition.
@@ -111,10 +133,17 @@ impl Partition {
}
/// Forks the partition.
///
/// Must freeze the partition before fork.
pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition {
let inner = self.inner.read().unwrap();
debug_assert!(inner.shard_builder.is_empty());
// TODO(yingwen): TTL or evict shards.
let shard_builder = ShardBuilder::new(metadata.clone(), config);
let shard_builder = ShardBuilder::new(
metadata.clone(),
config,
inner.shard_builder.current_shard_id(),
);
let shards = inner
.shards
.iter()
@@ -125,7 +154,6 @@ impl Partition {
inner: RwLock::new(Inner {
metadata: metadata.clone(),
shard_builder,
active_shard_id: inner.active_shard_id,
shards,
num_rows: 0,
dedup: config.dedup,
@@ -180,9 +208,187 @@ impl Partition {
/// Reader to scan rows in a partition.
///
/// It can merge rows from multiple shards.
pub struct PartitionReader {}
pub struct PartitionReader {
metadata: RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
pk_weights: Vec<u16>,
shard_merger: ShardMerger,
last_yield_pk_id: Option<PkId>,
}
pub type PartitionRef = Arc<Partition>;
impl PartitionReader {
pub fn is_valid(&self) -> bool {
self.shard_merger.is_valid()
}
pub fn next(&mut self) -> Result<()> {
self.shard_merger.next()?;
if self.metadata.primary_key.is_empty() {
// Nothing to prune.
return Ok(());
}
while self.shard_merger.is_valid() {
let pk_id = self.shard_merger.current_pk_id();
if let Some(yield_pk_id) = self.last_yield_pk_id {
if pk_id == yield_pk_id {
// If this batch has the same key as last returned batch.
// We can return it without evaluating filters.
break;
}
}
let key = self.shard_merger.current_key().unwrap();
// Prune batch by primary key.
if prune_primary_key(&self.metadata, &self.filters, &self.row_codec, key) {
// We need this key.
self.last_yield_pk_id = Some(pk_id);
break;
}
self.shard_merger.next()?;
}
Ok(())
}
pub fn convert_current_batch(&self) -> Result<Batch> {
let data_batch = self.shard_merger.current_data_batch();
data_batch_to_batch(
&self.metadata,
&self.projection,
self.shard_merger.current_key(),
data_batch,
)
}
pub(crate) fn into_context(self) -> ReadPartitionContext {
ReadPartitionContext {
metadata: self.metadata,
row_codec: self.row_codec,
projection: self.projection,
filters: self.filters,
pk_weights: self.pk_weights,
}
}
}
// TODO(yingwen): Improve performance of key prunning. Now we need to find index and
// then decode and convert each value.
/// Returns true if the `pk` is still needed.
fn prune_primary_key(
metadata: &RegionMetadataRef,
filters: &[SimpleFilterEvaluator],
codec: &McmpRowCodec,
pk: &[u8],
) -> bool {
if filters.is_empty() {
return true;
}
// no primary key, we simply return true.
if metadata.primary_key.is_empty() {
return true;
}
let pk_values = match codec.decode(pk) {
Ok(values) => values,
Err(e) => {
common_telemetry::error!(e; "Failed to decode primary key");
return true;
}
};
// evaluate filters against primary key values
let mut result = true;
for filter in filters {
let Some(column) = metadata.column_by_name(filter.column_name()) else {
continue;
};
// ignore filters that are not referencing primary key columns
if column.semantic_type != SemanticType::Tag {
continue;
}
// index of the column in primary keys.
// Safety: A tag column is always in primary key.
let index = metadata.primary_key_index(column.column_id).unwrap();
// Safety: arrow schema and datatypes are constructed from the same source.
let scalar_value = pk_values[index]
.try_to_scalar_value(&column.column_schema.data_type)
.unwrap();
result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true);
}
result
}
/// Structs to reuse across readers to avoid allocating for each reader.
pub(crate) struct ReadPartitionContext {
metadata: RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
/// Buffer to store pk weights.
pk_weights: Vec<u16>,
}
impl ReadPartitionContext {
pub(crate) fn new(
metadata: RegionMetadataRef,
row_codec: Arc<McmpRowCodec>,
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
) -> ReadPartitionContext {
ReadPartitionContext {
metadata,
row_codec,
projection,
filters,
pk_weights: Vec::new(),
}
}
}
// TODO(yingwen): Pushdown projection to shard readers.
/// Converts a [DataBatch] to a [Batch].
fn data_batch_to_batch(
metadata: &RegionMetadataRef,
projection: &HashSet<ColumnId>,
key: Option<&[u8]>,
data_batch: DataBatch,
) -> Result<Batch> {
let record_batch = data_batch.slice_record_batch();
let primary_key = key.map(|k| k.to_vec()).unwrap_or_default();
let mut builder = BatchBuilder::new(primary_key);
builder
.timestamps_array(record_batch.column(1).clone())?
.sequences_array(record_batch.column(2).clone())?
.op_types_array(record_batch.column(3).clone())?;
if record_batch.num_columns() <= 4 {
// No fields.
return builder.build();
}
// Iterate all field columns.
for (array, field) in record_batch
.columns()
.iter()
.zip(record_batch.schema().fields().iter())
.skip(4)
{
// TODO(yingwen): Avoid finding column by name. We know the schema of a DataBatch.
// Safety: metadata should contain all fields.
let column_id = metadata.column_by_name(field.name()).unwrap().column_id;
if !projection.contains(&column_id) {
continue;
}
builder.push_field_array(column_id, array.clone())?;
}
builder.build()
}
/// Inner struct of the partition.
///
@@ -191,7 +397,6 @@ struct Inner {
metadata: RegionMetadataRef,
/// Shard whose dictionary is active.
shard_builder: ShardBuilder,
active_shard_id: ShardId,
/// Shards with frozen dictionary.
shards: Vec<Shard>,
num_rows: usize,
@@ -199,23 +404,21 @@ struct Inner {
}
impl Inner {
fn new(metadata: RegionMetadataRef, shard_builder: ShardBuilder, dedup: bool) -> Self {
let mut inner = Self {
fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self {
let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
(vec![Shard::new(0, None, data_parts, config.dedup)], 1)
} else {
(Vec::new(), 0)
};
let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id);
Self {
metadata,
shard_builder,
active_shard_id: 0,
shards: Vec::new(),
shards,
num_rows: 0,
dedup,
};
if inner.metadata.primary_key.is_empty() {
let data_parts = DataParts::new(inner.metadata.clone(), DATA_INIT_CAP, dedup);
inner.shards.push(Shard::new(0, None, data_parts, dedup));
inner.active_shard_id = 1;
dedup: config.dedup,
}
inner
}
fn find_key_in_shards(&self, primary_key: &[u8]) -> Option<PkId> {
@@ -239,11 +442,7 @@ impl Inner {
}
fn freeze_active_shard(&mut self) -> Result<()> {
if let Some(shard) = self
.shard_builder
.finish(self.active_shard_id, self.metadata.clone())?
{
self.active_shard_id += 1;
if let Some(shard) = self.shard_builder.finish(self.metadata.clone())? {
self.shards.push(shard);
}
Ok(())

View File

@@ -14,11 +14,16 @@
//! Shard in a partition.
use std::cmp::Ordering;
use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{DataParts, DATA_INIT_CAP};
use crate::memtable::merge_tree::data::{DataBatch, DataParts, DataPartsReader, DATA_INIT_CAP};
use crate::memtable::merge_tree::dict::KeyDictRef;
use crate::memtable::merge_tree::merger::{Merger, Node};
use crate::memtable::merge_tree::shard_builder::ShardBuilderReader;
use crate::memtable::merge_tree::{PkId, ShardId};
/// Shard stores data related to the same key dictionary.
@@ -67,8 +72,14 @@ impl Shard {
/// Scans the shard.
// TODO(yingwen): Push down projection to data parts.
pub fn scan(&self) -> ShardReader {
unimplemented!()
pub fn read(&mut self) -> Result<ShardReader> {
let parts_reader = self.data_parts.read()?;
Ok(ShardReader {
shard_id: self.shard_id,
key_dict: self.key_dict.clone(),
parts_reader,
})
}
/// Returns the memory size of the shard part.
@@ -91,7 +102,189 @@ impl Shard {
}
/// Reader to read rows in a shard.
pub struct ShardReader {}
pub struct ShardReader {
shard_id: ShardId,
key_dict: Option<KeyDictRef>,
parts_reader: DataPartsReader,
}
impl ShardReader {
fn shard_id(&self) -> ShardId {
self.shard_id
}
fn is_valid(&self) -> bool {
self.parts_reader.is_valid()
}
fn next(&mut self) -> Result<()> {
self.parts_reader.next()
}
fn current_key(&self) -> Option<&[u8]> {
let pk_index = self.parts_reader.current_data_batch().pk_index();
self.key_dict
.as_ref()
.map(|dict| dict.key_by_pk_index(pk_index))
}
fn current_pk_id(&self) -> PkId {
let pk_index = self.parts_reader.current_data_batch().pk_index();
PkId {
shard_id: self.shard_id,
pk_index,
}
}
fn current_data_batch(&self) -> DataBatch {
self.parts_reader.current_data_batch()
}
}
pub(crate) struct ShardMerger {
merger: Merger<ShardNode>,
}
impl ShardMerger {
pub(crate) fn try_new(nodes: Vec<ShardNode>) -> Result<Self> {
let merger = Merger::try_new(nodes)?;
Ok(ShardMerger { merger })
}
pub(crate) fn is_valid(&self) -> bool {
self.merger.is_valid()
}
pub(crate) fn next(&mut self) -> Result<()> {
self.merger.next()
}
pub(crate) fn current_pk_id(&self) -> PkId {
self.merger.current_node().current_pk_id()
}
pub(crate) fn current_key(&self) -> Option<&[u8]> {
self.merger.current_node().current_key()
}
pub(crate) fn current_data_batch(&self) -> DataBatch {
let batch = self.merger.current_node().current_data_batch();
batch.slice(0, self.merger.current_rows())
}
}
pub(crate) enum ShardSource {
Builder(ShardBuilderReader),
Shard(ShardReader),
}
impl ShardSource {
fn is_valid(&self) -> bool {
match self {
ShardSource::Builder(r) => r.is_valid(),
ShardSource::Shard(r) => r.is_valid(),
}
}
fn next(&mut self) -> Result<()> {
match self {
ShardSource::Builder(r) => r.next(),
ShardSource::Shard(r) => r.next(),
}
}
fn current_pk_id(&self) -> PkId {
match self {
ShardSource::Builder(r) => r.current_pk_id(),
ShardSource::Shard(r) => r.current_pk_id(),
}
}
fn current_key(&self) -> Option<&[u8]> {
match self {
ShardSource::Builder(r) => r.current_key(),
ShardSource::Shard(r) => r.current_key(),
}
}
fn current_data_batch(&self) -> DataBatch {
match self {
ShardSource::Builder(r) => r.current_data_batch(),
ShardSource::Shard(r) => r.current_data_batch(),
}
}
}
/// Node for the merger to get items.
pub(crate) struct ShardNode {
source: ShardSource,
}
impl ShardNode {
pub(crate) fn new(source: ShardSource) -> Self {
Self { source }
}
fn current_pk_id(&self) -> PkId {
self.source.current_pk_id()
}
fn current_key(&self) -> Option<&[u8]> {
self.source.current_key()
}
fn current_data_batch(&self) -> DataBatch {
self.source.current_data_batch()
}
}
impl PartialEq for ShardNode {
fn eq(&self, other: &Self) -> bool {
self.source.current_key() == other.source.current_key()
}
}
impl Eq for ShardNode {}
impl Ord for ShardNode {
fn cmp(&self, other: &Self) -> Ordering {
self.source
.current_key()
.cmp(&other.source.current_key())
.reverse()
}
}
impl PartialOrd for ShardNode {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Node for ShardNode {
fn is_valid(&self) -> bool {
self.source.is_valid()
}
fn is_behind(&self, other: &Self) -> bool {
// We expect a key only belongs to one shard.
debug_assert_ne!(self.source.current_key(), other.source.current_key());
self.source.current_key() < other.source.current_key()
}
fn advance(&mut self, len: usize) -> Result<()> {
debug_assert_eq!(self.source.current_data_batch().num_rows(), len);
self.source.next()
}
fn current_item_len(&self) -> usize {
self.current_data_batch().num_rows()
}
fn search_key_in_current_item(&self, _other: &Self) -> Result<usize, usize> {
Err(self.source.current_data_batch().num_rows())
}
}
#[cfg(test)]
mod tests {

View File

@@ -14,24 +14,25 @@
//! Builder of a shard.
use std::collections::HashSet;
use std::sync::Arc;
use common_recordbatch::filter::SimpleFilterEvaluator;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP};
use crate::memtable::merge_tree::dict::KeyDictBuilder;
use crate::memtable::merge_tree::data::{
DataBatch, DataBuffer, DataBufferReader, DataParts, DATA_INIT_CAP,
};
use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder};
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::shard::Shard;
use crate::memtable::merge_tree::{MergeTreeConfig, ShardId};
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId};
/// Builder to write keys and data to a shard that the key dictionary
/// is still active.
pub struct ShardBuilder {
/// Id of the current shard to build.
current_shard_id: ShardId,
/// Builder for the key dictionary.
dict_builder: KeyDictBuilder,
/// Buffer to store data.
@@ -43,13 +44,17 @@ pub struct ShardBuilder {
impl ShardBuilder {
/// Returns a new builder.
pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> ShardBuilder {
let dedup = config.dedup;
pub fn new(
metadata: RegionMetadataRef,
config: &MergeTreeConfig,
shard_id: ShardId,
) -> ShardBuilder {
ShardBuilder {
current_shard_id: shard_id,
dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard),
data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, dedup),
data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, config.dedup),
data_freeze_threshold: config.data_freeze_threshold,
dedup,
dedup: config.dedup,
}
}
@@ -65,15 +70,16 @@ impl ShardBuilder {
self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold
}
/// Returns the current shard id of the builder.
pub fn current_shard_id(&self) -> ShardId {
self.current_shard_id
}
/// Builds a new shard and resets the builder.
///
/// Returns `None` if the builder is empty.
pub fn finish(
&mut self,
shard_id: ShardId,
metadata: RegionMetadataRef,
) -> Result<Option<Shard>> {
if self.data_buffer.is_empty() {
pub fn finish(&mut self, metadata: RegionMetadataRef) -> Result<Option<Shard>> {
if self.is_empty() {
return Ok(None);
}
@@ -93,24 +99,68 @@ impl ShardBuilder {
let data_parts =
DataParts::new(metadata, DATA_INIT_CAP, self.dedup).with_frozen(vec![data_part]);
let key_dict = key_dict.map(Arc::new);
let shard_id = self.current_shard_id;
self.current_shard_id += 1;
Ok(Some(Shard::new(shard_id, key_dict, data_parts, self.dedup)))
}
/// Scans the shard builder.
pub fn scan(
&mut self,
_projection: &HashSet<ColumnId>,
_filters: &[SimpleFilterEvaluator],
) -> Result<ShardBuilderReader> {
unimplemented!()
pub fn read(&mut self, pk_weights_buffer: &mut Vec<u16>) -> Result<ShardBuilderReader> {
let dict_reader = self.dict_builder.read();
dict_reader.pk_weights_to_sort_data(pk_weights_buffer);
let data_reader = self.data_buffer.read(Some(pk_weights_buffer))?;
Ok(ShardBuilderReader {
shard_id: self.current_shard_id,
dict_reader,
data_reader,
})
}
/// Returns true if the builder is empty.
pub fn is_empty(&self) -> bool {
self.data_buffer.is_empty()
}
}
/// Reader to scan a shard builder.
pub struct ShardBuilderReader {}
pub struct ShardBuilderReader {
shard_id: ShardId,
dict_reader: DictBuilderReader,
data_reader: DataBufferReader,
}
// TODO(yingwen): Can we use generic for data reader?
impl ShardBuilderReader {
pub fn shard_id(&self) -> ShardId {
self.shard_id
}
pub fn is_valid(&self) -> bool {
self.data_reader.is_valid()
}
pub fn next(&mut self) -> Result<()> {
self.data_reader.next()
}
pub fn current_key(&self) -> Option<&[u8]> {
let pk_index = self.data_reader.current_data_batch().pk_index();
Some(self.dict_reader.key_by_pk_index(pk_index))
}
pub fn current_pk_id(&self) -> PkId {
let pk_index = self.data_reader.current_data_batch().pk_index();
PkId {
shard_id: self.shard_id,
pk_index,
}
}
pub fn current_data_batch(&self) -> DataBatch {
self.data_reader.current_data_batch()
}
}
#[cfg(test)]
mod tests {
@@ -179,9 +229,10 @@ mod tests {
let metadata = metadata_for_test();
let input = input_with_key(&metadata);
let config = MergeTreeConfig::default();
let mut shard_builder = ShardBuilder::new(metadata.clone(), &config);
let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
let mut metrics = WriteMetrics::default();
assert!(shard_builder.finish(1, metadata.clone()).unwrap().is_none());
assert!(shard_builder.finish(metadata.clone()).unwrap().is_none());
assert_eq!(1, shard_builder.current_shard_id);
for key_values in &input {
for kv in key_values.iter() {
@@ -189,6 +240,8 @@ mod tests {
shard_builder.write_with_key(&key, kv, &mut metrics);
}
}
shard_builder.finish(1, metadata).unwrap().unwrap();
let shard = shard_builder.finish(metadata).unwrap().unwrap();
assert_eq!(1, shard.shard_id);
assert_eq!(2, shard_builder.current_shard_id);
}
}

View File

@@ -32,7 +32,7 @@ use crate::error::{PrimaryKeyLengthMismatchSnafu, Result};
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef,
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
};
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::time_series::primary_key_schema;
@@ -122,7 +122,7 @@ impl MergeTree {
}
/// Scans the tree.
pub fn scan(
pub fn read(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
@@ -151,16 +151,21 @@ impl MergeTree {
.map(|pk| pk.column_schema.data_type.clone())
.collect();
let iter = TreeIter {
let mut iter = TreeIter {
metadata: self.metadata.clone(),
pk_schema,
pk_datatypes,
projection,
filters,
row_codec: self.row_codec.clone(),
partitions,
current_reader: None,
};
let context = ReadPartitionContext::new(
self.metadata.clone(),
self.row_codec.clone(),
projection,
filters,
);
iter.fetch_next_partition(context)?;
Ok(Box::new(iter))
}
@@ -281,8 +286,6 @@ struct TreeIter {
metadata: RegionMetadataRef,
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
row_codec: Arc<McmpRowCodec>,
partitions: VecDeque<PartitionRef>,
current_reader: Option<PartitionReader>,
@@ -292,6 +295,44 @@ impl Iterator for TreeIter {
type Item = Result<Batch>;
fn next(&mut self) -> Option<Self::Item> {
unimplemented!()
self.next_batch().transpose()
}
}
impl TreeIter {
/// Fetch next partition.
fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> {
while let Some(partition) = self.partitions.pop_front() {
let part_reader = partition.read(context)?;
if !part_reader.is_valid() {
context = part_reader.into_context();
continue;
}
self.current_reader = Some(part_reader);
break;
}
Ok(())
}
/// Fetches next batch.
fn next_batch(&mut self) -> Result<Option<Batch>> {
let Some(part_reader) = &mut self.current_reader else {
return Ok(None);
};
debug_assert!(part_reader.is_valid());
let batch = part_reader.convert_current_batch()?;
part_reader.next()?;
if part_reader.is_valid() {
return Ok(Some(batch));
}
// Safety: current reader is Some.
let part_reader = self.current_reader.take().unwrap();
let context = part_reader.into_context();
self.fetch_next_partition(context)?;
Ok(Some(batch))
}
}