feat: Implement dedup for the new memtable and expose the config (#3377)

* fix: KeyValues num_fields() is incorrect

* chore: fix warnings

* feat: support dedup

* feat: allow using the new memtable

* feat: serde default for config

* fix: resets pk index after finishing a dict
This commit is contained in:
Yingwen
2024-02-25 21:06:01 +08:00
committed by GitHub
parent 606309f49a
commit e481f073f5
11 changed files with 226 additions and 149 deletions

View File

@@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};
use crate::error::Result;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
/// Default max running background job.
@@ -102,6 +103,9 @@ pub struct MitoConfig {
/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,
/// Experimental memtable.
pub experimental_memtable: Option<MergeTreeConfig>,
}
impl Default for MitoConfig {
@@ -127,6 +131,7 @@ impl Default for MitoConfig {
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
experimental_memtable: None,
};
// Adjust buffer and cache size according to system memory if we can.

View File

@@ -550,11 +550,11 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;
let region_stat = region.region_usage().await;
assert_eq!(region_stat.wal_usage, 0);
assert_eq!(region_stat.sst_usage, 2962);
// region total usage
assert_eq!(region_stat.disk_usage(), 4028);
// Some memtables may share items.
assert!(region_stat.disk_usage() >= 4028);
}
#[tokio::test]

View File

@@ -124,7 +124,7 @@ impl<'a> KeyValue<'a> {
/// Get number of field columns.
pub fn num_fields(&self) -> usize {
self.row.values.len() - self.helper.num_primary_key_column - 1
self.helper.indices.len() - self.helper.num_primary_key_column - 1
}
/// Get sequence.
@@ -261,7 +261,13 @@ mod tests {
}
}
fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) {
fn check_key_values(
kvs: &KeyValues,
num_rows: usize,
keys: &[Option<i64>],
ts: i64,
values: &[Option<i64>],
) {
assert_eq!(num_rows, kvs.num_rows());
let mut expect_seq = START_SEQ;
let expect_ts = ValueRef::Int64(ts);
@@ -273,10 +279,10 @@ mod tests {
assert_eq!(values.len(), kv.num_fields());
assert_eq!(expect_ts, kv.timestamp());
let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::Int64(*k)).collect();
let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
let actual_keys: Vec<_> = kv.primary_keys().collect();
assert_eq!(expect_keys, actual_keys);
let expect_values: Vec<_> = values.iter().map(|v| ValueRef::Int64(*v)).collect();
let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
let actual_values: Vec<_> = kv.fields().collect();
assert_eq!(expect_values, actual_values);
}
@@ -312,7 +318,7 @@ mod tests {
// KeyValues
// keys: [k0=2, k1=0]
// ts: 1,
check_key_values(&kvs, 3, &[2, 0], 1, &[]);
check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
}
#[test]
@@ -325,7 +331,7 @@ mod tests {
// KeyValues (note that v0 is in front of v1 in region schema)
// ts: 2,
// fields: [v0=1, v1=0]
check_key_values(&kvs, 3, &[], 2, &[1, 0]);
check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
}
#[test]
@@ -339,6 +345,34 @@ mod tests {
// keys: [k0=0, k1=3]
// ts: 2,
// fields: [v0=1, v1=4]
check_key_values(&kvs, 3, &[0, 3], 2, &[1, 4]);
check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
}
#[test]
fn test_sparse_field() {
let meta = new_region_metadata(2, 2);
// The value of each row:
// k0=0, v0=1, ts=2, k1=3, (v1 will be null)
let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
// KeyValues
// keys: [k0=0, k1=3]
// ts: 2,
// fields: [v0=1, v1=null]
check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
}
#[test]
fn test_sparse_tag_field() {
let meta = new_region_metadata(2, 2);
// The value of each row:
// k0 = 0, v0=1, ts=2, (k1, v1 will be null)
let mutation = new_mutation(&["k0", "v0", "ts"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
// KeyValues
// keys: [k0=0, k1=null]
// ts: 2,
// fields: [v0=1, v1=null]
check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
}
}

View File

@@ -28,6 +28,7 @@ use std::fmt;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
@@ -54,7 +55,8 @@ struct PkId {
}
/// Config for the merge tree memtable.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct MergeTreeConfig {
/// Max keys in an index shard.
pub index_max_keys_per_shard: usize,
@@ -248,16 +250,19 @@ impl MergeTreeMemtable {
/// Builder to build a [MergeTreeMemtable].
#[derive(Debug, Default)]
pub struct MergeTreeMemtableBuilder {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: MergeTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
}
impl MergeTreeMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
pub fn new(
config: MergeTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
Self {
config,
write_buffer_manager,
config: MergeTreeConfig::default(),
}
}
}
@@ -420,7 +425,8 @@ mod tests {
memtable_util::metadata_with_primary_key(vec![], false)
};
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata);
let memtable =
MergeTreeMemtableBuilder::new(MergeTreeConfig::default(), None).build(1, &metadata);
let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);

View File

@@ -883,29 +883,6 @@ impl DataPartsReader {
}
}
#[cfg(test)]
pub(crate) fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);
for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}
#[cfg(test)]
mod tests {
use datafusion::arrow::array::Float64Array;
@@ -914,7 +891,9 @@ mod tests {
use parquet::data_type::AsBytes;
use super::*;
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};
use crate::test_util::memtable_util::{
extract_data_batch, metadata_for_test, write_rows_to_buffer,
};
#[test]
fn test_lazy_mutable_vector_builder() {

View File

@@ -16,39 +16,19 @@ use std::ops::Range;
use crate::error::Result;
use crate::memtable::merge_tree::data::DataBatch;
use crate::memtable::merge_tree::shard::DataBatchSource;
use crate::memtable::merge_tree::PkId;
pub trait DedupSource {
/// Returns whether current source is still valid.
fn is_valid(&self) -> bool;
/// Advances source to next data batch.
fn next(&mut self) -> Result<()>;
/// Returns current pk id.
/// # Panics
/// If source is not valid.
fn current_pk_id(&self) -> PkId;
/// Returns the current primary key bytes.
/// # Panics
/// If source is not valid.
fn current_key(&self) -> &[u8];
/// Returns the data part.
/// # Panics
/// If source is not valid.
fn current_data_batch(&self) -> DataBatch;
}
struct DedupReader<T> {
/// A reader that dedup sorted batches from a merger.
pub struct DedupReader<T> {
prev_batch_last_row: Option<(PkId, i64)>,
current_batch_range: Option<Range<usize>>,
inner: T,
}
impl<T: DedupSource> DedupReader<T> {
fn try_new(inner: T) -> Result<Self> {
impl<T: DataBatchSource> DedupReader<T> {
/// Creates a new dedup reader.
pub fn try_new(inner: T) -> Result<Self> {
let mut res = Self {
prev_batch_last_row: None,
current_batch_range: None,
@@ -57,24 +37,13 @@ impl<T: DedupSource> DedupReader<T> {
res.next()?;
Ok(res)
}
}
impl<T: DataBatchSource> DataBatchSource for DedupReader<T> {
fn is_valid(&self) -> bool {
self.current_batch_range.is_some()
}
/// Returns current encoded primary key.
/// # Panics
/// If inner reader is exhausted.
fn current_key(&self) -> &[u8] {
self.inner.current_key()
}
fn current_data_batch(&self) -> DataBatch {
let range = self.current_batch_range.as_ref().unwrap();
let data_batch = self.inner.current_data_batch();
data_batch.slice(range.start, range.len())
}
fn next(&mut self) -> Result<()> {
loop {
match &mut self.prev_batch_last_row {
@@ -122,6 +91,20 @@ impl<T: DedupSource> DedupReader<T> {
}
Ok(())
}
fn current_pk_id(&self) -> PkId {
self.inner.current_pk_id()
}
fn current_key(&self) -> Option<&[u8]> {
self.inner.current_key()
}
fn current_data_batch(&self) -> DataBatch {
let range = self.current_batch_range.as_ref().unwrap();
let data_batch = self.inner.current_data_batch();
data_batch.slice(range.start, range.len())
}
}
#[cfg(test)]
@@ -129,33 +112,35 @@ mod tests {
use store_api::metadata::RegionMetadataRef;
use super::*;
use crate::memtable::merge_tree::data::{
write_rows_to_buffer, DataBuffer, DataParts, DataPartsReader,
use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DataPartsReader};
use crate::test_util::memtable_util::{
extract_data_batch, metadata_for_test, write_rows_to_buffer,
};
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};
impl DedupSource for DataPartsReader {
struct MockSource(DataPartsReader);
impl DataBatchSource for MockSource {
fn is_valid(&self) -> bool {
self.is_valid()
self.0.is_valid()
}
fn next(&mut self) -> Result<()> {
self.next()
self.0.next()
}
fn current_pk_id(&self) -> PkId {
PkId {
shard_id: 0,
pk_index: self.current_data_batch().pk_index(),
pk_index: self.0.current_data_batch().pk_index(),
}
}
fn current_key(&self) -> &[u8] {
b"abcf"
fn current_key(&self) -> Option<&[u8]> {
None
}
fn current_data_batch(&self) -> DataBatch {
self.current_data_batch()
self.0.current_data_batch()
}
}
@@ -194,7 +179,7 @@ mod tests {
let mut parts = DataParts::new(meta, 10, true).with_frozen(frozens);
let mut res = Vec::with_capacity(expected.len());
let mut reader = DedupReader::try_new(parts.read().unwrap()).unwrap();
let mut reader = DedupReader::try_new(MockSource(parts.read().unwrap())).unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
res.push(extract_data_batch(&batch));

View File

@@ -80,7 +80,7 @@ impl KeyDictBuilder {
if self.key_buffer.len() >= MAX_KEYS_PER_BLOCK.into() {
// The write buffer is full. Freeze a dict block.
let dict_block = self.key_buffer.finish();
let dict_block = self.key_buffer.finish(false);
self.dict_blocks.push(dict_block);
}
@@ -113,8 +113,8 @@ impl KeyDictBuilder {
return None;
}
// Finishes current dict block.
let dict_block = self.key_buffer.finish();
// Finishes current dict block and resets the pk index.
let dict_block = self.key_buffer.finish(true);
self.dict_blocks.push(dict_block);
// Takes the pk to index map.
let mut pk_to_index = std::mem::take(&mut self.pk_to_index);
@@ -317,12 +317,15 @@ impl KeyBuffer {
.unwrap_or(0)
}
fn finish(&mut self) -> DictBlock {
fn finish(&mut self, reset_index: bool) -> DictBlock {
let primary_key = self.key_builder.finish();
// Reserve capacity for the new builder. `finish()` the builder will leave the builder
// empty with capacity 0.
// TODO(yingwen): Do we need to reserve capacity for data?
self.key_builder = BinaryBuilder::with_capacity(primary_key.len(), 0);
if reset_index {
self.next_pk_index = 0;
}
DictBlock::new(primary_key)
}

View File

@@ -28,8 +28,11 @@ use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
use crate::memtable::merge_tree::dedup::DedupReader;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::shard::{Shard, ShardMerger, ShardNode, ShardSource};
use crate::memtable::merge_tree::shard::{
BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
};
use crate::memtable::merge_tree::shard_builder::ShardBuilder;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId};
use crate::read::{Batch, BatchBuilder};
@@ -41,6 +44,8 @@ pub type PartitionKey = u32;
/// A tree partition.
pub struct Partition {
inner: RwLock<Inner>,
/// Whether to dedup batches.
dedup: bool,
}
pub type PartitionRef = Arc<Partition>;
@@ -50,6 +55,7 @@ impl Partition {
pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self {
Partition {
inner: RwLock::new(Inner::new(metadata, config)),
dedup: config.dedup,
}
}
@@ -113,16 +119,13 @@ impl Partition {
};
// Creating a shard merger will invoke next so we do it outside of the lock.
let shard_merger = ShardMerger::try_new(nodes)?;
Ok(PartitionReader {
metadata: context.metadata,
row_codec: context.row_codec,
projection: context.projection,
filters: context.filters,
pk_weights: context.pk_weights,
shard_merger,
last_yield_pk_id: None,
})
let merger = ShardMerger::try_new(nodes)?;
if self.dedup {
let source = DedupReader::try_new(merger)?;
PartitionReader::new(context, Box::new(source))
} else {
PartitionReader::new(context, Box::new(merger))
}
}
/// Freezes the partition.
@@ -156,8 +159,8 @@ impl Partition {
shard_builder,
shards,
num_rows: 0,
dedup: config.dedup,
}),
dedup: self.dedup,
}
}
@@ -214,51 +217,52 @@ pub struct PartitionReader {
projection: HashSet<ColumnId>,
filters: Vec<SimpleFilterEvaluator>,
pk_weights: Vec<u16>,
shard_merger: ShardMerger,
source: BoxedDataBatchSource,
last_yield_pk_id: Option<PkId>,
}
impl PartitionReader {
fn new(context: ReadPartitionContext, source: BoxedDataBatchSource) -> Result<Self> {
let mut reader = Self {
metadata: context.metadata,
row_codec: context.row_codec,
projection: context.projection,
filters: context.filters,
pk_weights: context.pk_weights,
source,
last_yield_pk_id: None,
};
// Find next valid batch.
reader.prune_batch_by_key()?;
Ok(reader)
}
/// Returns true if the reader is valid.
pub fn is_valid(&self) -> bool {
self.shard_merger.is_valid()
self.source.is_valid()
}
/// Advances the reader.
///
/// # Panics
/// Panics if the reader is invalid.
pub fn next(&mut self) -> Result<()> {
self.shard_merger.next()?;
self.source.next()?;
if self.metadata.primary_key.is_empty() {
// Nothing to prune.
return Ok(());
}
while self.shard_merger.is_valid() {
let pk_id = self.shard_merger.current_pk_id();
if let Some(yield_pk_id) = self.last_yield_pk_id {
if pk_id == yield_pk_id {
// If this batch has the same key as last returned batch.
// We can return it without evaluating filters.
break;
}
}
let key = self.shard_merger.current_key().unwrap();
// Prune batch by primary key.
if prune_primary_key(&self.metadata, &self.filters, &self.row_codec, key) {
// We need this key.
self.last_yield_pk_id = Some(pk_id);
break;
}
self.shard_merger.next()?;
}
Ok(())
self.prune_batch_by_key()
}
/// Converts current data batch into a [Batch].
///
/// # Panics
/// Panics if the reader is invalid.
pub fn convert_current_batch(&self) -> Result<Batch> {
let data_batch = self.shard_merger.current_data_batch();
let data_batch = self.source.current_data_batch();
data_batch_to_batch(
&self.metadata,
&self.projection,
self.shard_merger.current_key(),
self.source.current_key(),
data_batch,
)
}
@@ -272,6 +276,34 @@ impl PartitionReader {
pk_weights: self.pk_weights,
}
}
fn prune_batch_by_key(&mut self) -> Result<()> {
if self.metadata.primary_key.is_empty() {
// Nothing to prune.
return Ok(());
}
while self.source.is_valid() {
let pk_id = self.source.current_pk_id();
if let Some(yield_pk_id) = self.last_yield_pk_id {
if pk_id == yield_pk_id {
// If this batch has the same key as last returned batch.
// We can return it without evaluating filters.
break;
}
}
let key = self.source.current_key().unwrap();
// Prune batch by primary key.
if prune_primary_key(&self.metadata, &self.filters, &self.row_codec, key) {
// We need this key.
self.last_yield_pk_id = Some(pk_id);
break;
}
self.source.next()?;
}
Ok(())
}
}
// TODO(yingwen): Improve performance of key prunning. Now we need to find index and
@@ -400,7 +432,6 @@ struct Inner {
/// Shards with frozen dictionary.
shards: Vec<Shard>,
num_rows: usize,
dedup: bool,
}
impl Inner {
@@ -417,7 +448,6 @@ impl Inner {
shard_builder,
shards,
num_rows: 0,
dedup: config.dedup,
}
}

View File

@@ -101,6 +101,33 @@ impl Shard {
}
}
/// Source that returns [DataBatch].
pub trait DataBatchSource {
/// Returns whether current source is still valid.
fn is_valid(&self) -> bool;
/// Advances source to next data batch.
fn next(&mut self) -> Result<()>;
/// Returns current pk id.
/// # Panics
/// If source is not valid.
fn current_pk_id(&self) -> PkId;
/// Returns the current primary key bytes or None if it doesn't have primary key.
///
/// # Panics
/// If source is not valid.
fn current_key(&self) -> Option<&[u8]>;
/// Returns the data part.
/// # Panics
/// If source is not valid.
fn current_data_batch(&self) -> DataBatch;
}
pub type BoxedDataBatchSource = Box<dyn DataBatchSource + Send>;
/// Reader to read rows in a shard.
pub struct ShardReader {
shard_id: ShardId,
@@ -141,6 +168,7 @@ impl ShardReader {
}
}
/// A merger that merges batches from multiple shards.
pub(crate) struct ShardMerger {
merger: Merger<ShardNode>,
}
@@ -150,24 +178,26 @@ impl ShardMerger {
let merger = Merger::try_new(nodes)?;
Ok(ShardMerger { merger })
}
}
pub(crate) fn is_valid(&self) -> bool {
impl DataBatchSource for ShardMerger {
fn is_valid(&self) -> bool {
self.merger.is_valid()
}
pub(crate) fn next(&mut self) -> Result<()> {
fn next(&mut self) -> Result<()> {
self.merger.next()
}
pub(crate) fn current_pk_id(&self) -> PkId {
fn current_pk_id(&self) -> PkId {
self.merger.current_node().current_pk_id()
}
pub(crate) fn current_key(&self) -> Option<&[u8]> {
fn current_key(&self) -> Option<&[u8]> {
self.merger.current_node().current_key()
}
pub(crate) fn current_data_batch(&self) -> DataBatch {
fn current_data_batch(&self) -> DataBatch {
let batch = self.merger.current_node().current_data_batch();
batch.slice(0, self.merger.current_rows())
}

View File

@@ -25,7 +25,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}
use store_api::storage::RegionId;
use crate::manifest::action::RegionEdit;
use crate::memtable::{MemtableBuilder, MemtableBuilderRef};
use crate::memtable::MemtableBuilder;
use crate::region::version::{Version, VersionBuilder, VersionControl};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
@@ -79,10 +79,6 @@ impl VersionControlBuilder {
self.file_purger.clone()
}
pub(crate) fn memtable_builder(&self) -> MemtableBuilderRef {
self.memtable_builder.clone()
}
pub(crate) fn push_l0_file(&mut self, start_ms: i64, end_ms: i64) -> &mut Self {
let file_id = FileId::random();
self.files.insert(

View File

@@ -49,6 +49,7 @@ use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::manifest::action::RegionEdit;
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::MemtableBuilderRef;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
@@ -323,6 +324,16 @@ impl<S: LogStore> WorkerStarter<S> {
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
let running = Arc::new(AtomicBool::new(true));
let memtable_builder = if let Some(config) = &self.config.experimental_memtable {
Arc::new(MergeTreeMemtableBuilder::new(
config.clone(),
Some(self.write_buffer_manager.clone()),
)) as _
} else {
Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))) as _
};
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config,
@@ -333,9 +344,7 @@ impl<S: LogStore> WorkerStarter<S> {
wal: Wal::new(self.log_store),
object_store_manager: self.object_store_manager.clone(),
running: running.clone(),
memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))),
memtable_builder,
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),