feat: merge tree dedup reader (#3375)

* feat: add dedup option to merge tree component

* feat: impl dedup reader for shard reader

* refactor: DedupReader::new to DedupReader::try_new

* refactor: remove DedupReader::current_key field

* fix: some cr comments

* fix: fmt

* fix: remove shard_id method from DedupSource
This commit is contained in:
Lei, HUANG
2024-02-24 21:50:49 +08:00
committed by GitHub
parent abbfd23d4b
commit afe4633320
8 changed files with 425 additions and 87 deletions

View File

@@ -14,7 +14,8 @@
//! Memtable implementation based on a merge tree.
mod data;
pub(crate) mod data;
mod dedup;
mod dict;
mod merger;
mod metrics;
@@ -59,6 +60,8 @@ pub struct MergeTreeConfig {
pub index_max_keys_per_shard: usize,
/// Number of rows to freeze a data part.
pub data_freeze_threshold: usize,
/// Whether to delete duplicates rows.
pub dedup: bool,
}
impl Default for MergeTreeConfig {
@@ -66,6 +69,7 @@ impl Default for MergeTreeConfig {
Self {
index_max_keys_per_shard: 8192,
data_freeze_threshold: 102400,
dedup: true,
}
}
}

View File

@@ -63,11 +63,11 @@ pub(crate) struct DataBatchRange {
impl DataBatchRange {
pub(crate) fn len(&self) -> usize {
(self.start..self.end).len()
self.end - self.start
}
pub(crate) fn is_empty(&self) -> bool {
(self.start..self.end).is_empty()
self.len() == 0
}
}
@@ -163,6 +163,10 @@ impl<'a> DataBatch<'a> {
},
}
}
pub(crate) fn num_rows(&self) -> usize {
self.range.len()
}
}
/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
@@ -180,11 +184,13 @@ pub struct DataBuffer {
op_type_builder: UInt8VectorBuilder,
/// Builders for field columns.
field_builders: Vec<LazyMutableVectorBuilder>,
dedup: bool,
}
impl DataBuffer {
/// Creates a `DataBuffer` instance with given schema and capacity.
pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize) -> Self {
pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize, dedup: bool) -> Self {
let ts_builder = metadata
.time_index_column()
.column_schema
@@ -209,6 +215,7 @@ impl DataBuffer {
sequence_builder,
op_type_builder,
field_builders,
dedup,
}
}
@@ -237,7 +244,13 @@ impl DataBuffer {
pk_weights: Option<&[u16]>,
replace_pk_index: bool,
) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None, replace_pk_index);
let encoder = DataPartEncoder::new(
&self.metadata,
pk_weights,
None,
replace_pk_index,
self.dedup,
);
let parts = encoder.write(self)?;
Ok(parts)
}
@@ -246,13 +259,12 @@ impl DataBuffer {
/// If pk_weights is present, yielded rows are sorted according to weights,
/// otherwise rows are sorted by "pk_weights" values as they are actually weights.
pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
// todo(hl): control whether to dedup while invoking `read`.
let batch = data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
pk_weights,
true,
true,
self.dedup,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
// - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
@@ -629,6 +641,7 @@ struct DataPartEncoder<'a> {
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
dedup: bool,
}
impl<'a> DataPartEncoder<'a> {
@@ -637,6 +650,7 @@ impl<'a> DataPartEncoder<'a> {
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
dedup: bool,
) -> DataPartEncoder<'a> {
let schema = memtable_schema_to_encoded_schema(metadata);
Self {
@@ -644,6 +658,7 @@ impl<'a> DataPartEncoder<'a> {
pk_weights,
row_group_size,
replace_pk_index,
dedup,
}
}
@@ -663,7 +678,7 @@ impl<'a> DataPartEncoder<'a> {
source,
self.pk_weights,
false,
true,
self.dedup,
self.replace_pk_index,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
@@ -803,9 +818,9 @@ pub struct DataParts {
}
impl DataParts {
pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize) -> Self {
pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize, dedup: bool) -> Self {
Self {
active: DataBuffer::with_capacity(metadata, capacity),
active: DataBuffer::with_capacity(metadata, capacity, dedup),
frozen: Vec::new(),
}
}
@@ -868,6 +883,29 @@ impl DataPartsReader {
}
}
#[cfg(test)]
pub(crate) fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);
for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}
#[cfg(test)]
mod tests {
use datafusion::arrow::array::Float64Array;
@@ -876,7 +914,7 @@ mod tests {
use parquet::data_type::AsBytes;
use super::*;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};
#[test]
fn test_lazy_mutable_vector_builder() {
@@ -900,7 +938,7 @@ mod tests {
fn check_test_data_buffer_to_record_batches(keep_data: bool) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2);
@@ -968,10 +1006,50 @@ mod tests {
check_test_data_buffer_to_record_batches(false);
}
fn check_data_buffer_dedup(dedup: bool) {
let metadata = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup);
write_rows_to_buffer(
&mut buffer,
&metadata,
0,
vec![2, 3],
vec![Some(1.0), Some(2.0)],
0,
);
write_rows_to_buffer(
&mut buffer,
&metadata,
0,
vec![1, 2],
vec![Some(1.1), Some(2.1)],
2,
);
let mut reader = buffer.read(Some(&[0])).unwrap();
let mut res = vec![];
while reader.is_valid() {
let batch = reader.current_data_batch();
res.push(extract_data_batch(&batch));
reader.next().unwrap();
}
if dedup {
assert_eq!(vec![(0, vec![(1, 2), (2, 3), (3, 1)])], res);
} else {
assert_eq!(vec![(0, vec![(1, 2), (2, 3), (2, 0), (3, 1)])], res);
}
}
#[test]
fn test_data_buffer_dedup() {
check_data_buffer_dedup(true);
check_data_buffer_dedup(false);
}
#[test]
fn test_data_buffer_to_record_batches_with_dedup() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![2], vec![Some(1.1)], 2);
@@ -1026,7 +1104,7 @@ mod tests {
#[test]
fn test_data_buffer_to_record_batches_without_dedup() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2);
@@ -1064,35 +1142,13 @@ mod tests {
);
}
fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);
for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}
fn check_data_buffer_freeze(
pk_weights: Option<&[u16]>,
replace_pk_weights: bool,
expected: &[(u16, Vec<(i64, u64)>)],
) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
// write rows with null values.
write_rows_to_buffer(
@@ -1113,21 +1169,7 @@ mod tests {
.unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
let rb = batch.slice_record_batch();
let ts = timestamp_array_to_i64_slice(rb.column(1));
let sequence = rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
let ts_and_seq = ts
.iter()
.zip(sequence.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
res.push((batch.pk_index(), ts_and_seq));
res.push(extract_data_batch(&batch));
reader.next().unwrap();
}
assert_eq!(expected, res);
@@ -1163,7 +1205,7 @@ mod tests {
#[test]
fn test_encode_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
// write rows with null values.
write_rows_to_buffer(
@@ -1181,7 +1223,7 @@ mod tests {
assert_eq!(4, buffer.num_rows());
let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true);
let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true, true);
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};
@@ -1228,7 +1270,7 @@ mod tests {
fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
write_rows_to_buffer(
&mut buffer,
@@ -1268,7 +1310,7 @@ mod tests {
#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}
@@ -1294,7 +1336,7 @@ mod tests {
fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
write_rows_to_buffer(
&mut buffer,
@@ -1323,7 +1365,7 @@ mod tests {
4,
);
let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true);
let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true, true);
let encoded = encoder.write(&mut buffer).unwrap();
let mut iter = encoded.read().unwrap();

View File

@@ -0,0 +1,235 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use crate::error::Result;
use crate::memtable::merge_tree::data::DataBatch;
use crate::memtable::merge_tree::PkId;
pub trait DedupSource {
/// Returns whether current source is still valid.
fn is_valid(&self) -> bool;
/// Advances source to next data batch.
fn next(&mut self) -> Result<()>;
/// Returns current pk id.
/// # Panics
/// If source is not valid.
fn current_pk_id(&self) -> PkId;
/// Returns the current primary key bytes.
/// # Panics
/// If source is not valid.
fn current_key(&self) -> &[u8];
/// Returns the data part.
/// # Panics
/// If source is not valid.
fn current_data_batch(&self) -> DataBatch;
}
struct DedupReader<T> {
prev_batch_last_row: Option<(PkId, i64)>,
current_batch_range: Option<Range<usize>>,
inner: T,
}
impl<T: DedupSource> DedupReader<T> {
fn try_new(inner: T) -> Result<Self> {
let mut res = Self {
prev_batch_last_row: None,
current_batch_range: None,
inner,
};
res.next()?;
Ok(res)
}
fn is_valid(&self) -> bool {
self.current_batch_range.is_some()
}
/// Returns current encoded primary key.
/// # Panics
/// If inner reader is exhausted.
fn current_key(&self) -> &[u8] {
self.inner.current_key()
}
fn current_data_batch(&self) -> DataBatch {
let range = self.current_batch_range.as_ref().unwrap();
let data_batch = self.inner.current_data_batch();
data_batch.slice(range.start, range.len())
}
fn next(&mut self) -> Result<()> {
loop {
match &mut self.prev_batch_last_row {
None => {
// First shot, fill prev_batch_last_row and current_batch_range with first batch.
let current_batch = self.inner.current_data_batch();
let pk_id = self.inner.current_pk_id();
let (last_ts, _) = current_batch.last_row();
self.prev_batch_last_row = Some((pk_id, last_ts));
self.current_batch_range = Some(0..current_batch.num_rows());
break;
}
Some(prev_last_row) => {
self.inner.next()?;
if !self.inner.is_valid() {
// Resets current_batch_range if inner reader is exhausted.
self.current_batch_range = None;
break;
}
let current_batch = self.inner.current_data_batch();
let current_pk_id = self.inner.current_pk_id();
let (first_ts, _) = current_batch.first_row();
let rows_in_batch = current_batch.num_rows();
let (start, end) = if &(current_pk_id, first_ts) == prev_last_row {
// First row in this batch duplicated with the last row in previous batch
if rows_in_batch == 1 {
// If batch is exhausted, move to next batch.
continue;
} else {
// Skip the first row, start from offset 1.
(1, rows_in_batch)
}
} else {
// No duplicates found, yield whole batch.
(0, rows_in_batch)
};
let (last_ts, _) = current_batch.last_row();
*prev_last_row = (current_pk_id, last_ts);
self.current_batch_range = Some(start..end);
break;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use store_api::metadata::RegionMetadataRef;
use super::*;
use crate::memtable::merge_tree::data::{
write_rows_to_buffer, DataBuffer, DataParts, DataPartsReader,
};
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};
impl DedupSource for DataPartsReader {
fn is_valid(&self) -> bool {
self.is_valid()
}
fn next(&mut self) -> Result<()> {
self.next()
}
fn current_pk_id(&self) -> PkId {
PkId {
shard_id: 0,
pk_index: self.current_data_batch().pk_index(),
}
}
fn current_key(&self) -> &[u8] {
b"abcf"
}
fn current_data_batch(&self) -> DataBatch {
self.current_data_batch()
}
}
fn build_data_buffer(
meta: RegionMetadataRef,
rows: Vec<(u16, Vec<i64>)>,
seq: &mut u64,
) -> DataBuffer {
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
for row in rows {
let (pk_index, timestamps) = row;
let num_rows = timestamps.len() as u64;
let v = timestamps.iter().map(|v| Some(*v as f64)).collect();
write_rows_to_buffer(&mut buffer, &meta, pk_index, timestamps, v, *seq);
*seq += num_rows;
}
buffer
}
fn check_data_parts_reader_dedup(
parts: Vec<Vec<(u16, Vec<i64>)>>,
expected: Vec<(u16, Vec<(i64, u64)>)>,
) {
let meta = metadata_for_test();
let mut seq = 0;
let mut frozens = Vec::with_capacity(parts.len());
for part in parts {
let mut buffer1 = build_data_buffer(meta.clone(), part, &mut seq);
let part1 = buffer1.freeze(None, false).unwrap();
frozens.push(part1);
}
let mut parts = DataParts::new(meta, 10, true).with_frozen(frozens);
let mut res = Vec::with_capacity(expected.len());
let mut reader = DedupReader::try_new(parts.read().unwrap()).unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
res.push(extract_data_batch(&batch));
reader.next().unwrap();
}
assert_eq!(expected, res);
}
#[test]
fn test_data_parts_reader_dedup() {
check_data_parts_reader_dedup(vec![vec![(0, vec![1, 2])]], vec![(0, vec![(1, 0), (2, 1)])]);
check_data_parts_reader_dedup(
vec![
vec![(0, vec![1, 2])],
vec![(0, vec![1, 2])],
vec![(0, vec![2, 3])],
],
vec![(0, vec![(1, 2)]), (0, vec![(2, 4)]), (0, vec![(3, 5)])],
);
check_data_parts_reader_dedup(
vec![vec![(0, vec![1])], vec![(0, vec![2])], vec![(0, vec![3])]],
vec![(0, vec![(1, 0)]), (0, vec![(2, 1)]), (0, vec![(3, 2)])],
);
check_data_parts_reader_dedup(
vec![vec![(0, vec![1])], vec![(0, vec![1])], vec![(0, vec![1])]],
vec![(0, vec![(1, 2)])],
);
check_data_parts_reader_dedup(
vec![vec![(0, vec![1])], vec![(1, vec![1])], vec![(2, vec![1])]],
vec![(0, vec![(1, 0)]), (1, vec![(1, 1)]), (2, vec![(1, 2)])],
);
}
}

View File

@@ -357,7 +357,7 @@ mod tests {
#[test]
fn test_merger() {
let metadata = metadata_for_test();
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
let weight = &[2, 1, 0];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq);
@@ -366,7 +366,7 @@ mod tests {
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq);
write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq);
let node2 = DataNode::new(DataSource::Part(
@@ -388,7 +388,7 @@ mod tests {
#[test]
fn test_merger2() {
let metadata = metadata_for_test();
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
let weight = &[2, 1, 0];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq);
@@ -397,13 +397,13 @@ mod tests {
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq);
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
let node3 = DataNode::new(DataSource::Part(
buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
@@ -426,7 +426,7 @@ mod tests {
#[test]
fn test_merger_overlapping() {
let metadata = metadata_for_test();
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
let weight = &[0, 1, 2];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq);
@@ -434,13 +434,13 @@ mod tests {
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq);
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
let node3 = DataNode::new(DataSource::Part(
buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
@@ -462,19 +462,19 @@ mod tests {
#[test]
fn test_merger_parts_and_buffer() {
let metadata = metadata_for_test();
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
let weight = &[0, 1, 2];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq);
let node1 = DataNode::new(DataSource::Buffer(buffer1.read(Some(weight)).unwrap()));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq);
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq);
let node3 = DataNode::new(DataSource::Part(
buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
@@ -496,7 +496,7 @@ mod tests {
#[test]
fn test_merger_overlapping_2() {
let metadata = metadata_for_test();
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
let weight = &[0, 1, 2];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 2], &mut seq);
@@ -504,13 +504,13 @@ mod tests {
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![2], &mut seq);
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2], &mut seq);
let node3 = DataNode::new(DataSource::Part(
buffer3.freeze(Some(weight), true).unwrap().read().unwrap(),
@@ -530,7 +530,7 @@ mod tests {
#[test]
fn test_merger_overlapping_3() {
let metadata = metadata_for_test();
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true);
let weight = &[0, 1, 2];
let mut seq = 0;
write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![0, 1], &mut seq);
@@ -538,7 +538,7 @@ mod tests {
buffer1.freeze(Some(weight), true).unwrap().read().unwrap(),
));
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10);
let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true);
write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq);
let node2 = DataNode::new(DataSource::Part(
buffer2.freeze(Some(weight), true).unwrap().read().unwrap(),

View File

@@ -46,7 +46,7 @@ impl Partition {
let shard_builder = ShardBuilder::new(metadata.clone(), config);
Partition {
inner: RwLock::new(Inner::new(metadata, shard_builder)),
inner: RwLock::new(Inner::new(metadata, shard_builder, config.dedup)),
}
}
@@ -128,6 +128,7 @@ impl Partition {
active_shard_id: inner.active_shard_id,
shards,
num_rows: 0,
dedup: config.dedup,
}),
}
}
@@ -194,21 +195,23 @@ struct Inner {
/// Shards with frozen dictionary.
shards: Vec<Shard>,
num_rows: usize,
dedup: bool,
}
impl Inner {
fn new(metadata: RegionMetadataRef, shard_builder: ShardBuilder) -> Self {
fn new(metadata: RegionMetadataRef, shard_builder: ShardBuilder, dedup: bool) -> Self {
let mut inner = Self {
metadata,
shard_builder,
active_shard_id: 0,
shards: Vec::new(),
num_rows: 0,
dedup,
};
if inner.metadata.primary_key.is_empty() {
let data_parts = DataParts::new(inner.metadata.clone(), DATA_INIT_CAP);
inner.shards.push(Shard::new(0, None, data_parts));
let data_parts = DataParts::new(inner.metadata.clone(), DATA_INIT_CAP, dedup);
inner.shards.push(Shard::new(0, None, data_parts, dedup));
inner.active_shard_id = 1;
}

View File

@@ -28,15 +28,22 @@ pub struct Shard {
key_dict: Option<KeyDictRef>,
/// Data in the shard.
data_parts: DataParts,
dedup: bool,
}
impl Shard {
/// Returns a new shard.
pub fn new(shard_id: ShardId, key_dict: Option<KeyDictRef>, data_parts: DataParts) -> Shard {
pub fn new(
shard_id: ShardId,
key_dict: Option<KeyDictRef>,
data_parts: DataParts,
dedup: bool,
) -> Shard {
Shard {
shard_id,
key_dict,
data_parts,
dedup,
}
}
@@ -77,7 +84,8 @@ impl Shard {
Shard {
shard_id: self.shard_id,
key_dict: self.key_dict.clone(),
data_parts: DataParts::new(metadata, DATA_INIT_CAP),
data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup),
dedup: self.dedup,
}
}
}
@@ -144,9 +152,9 @@ mod tests {
}
let dict = dict_builder.finish().unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP);
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);
Shard::new(shard_id, Some(Arc::new(dict)), data_parts)
Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true)
}
#[test]

View File

@@ -38,15 +38,18 @@ pub struct ShardBuilder {
data_buffer: DataBuffer,
/// Number of rows to freeze a data part.
data_freeze_threshold: usize,
dedup: bool,
}
impl ShardBuilder {
/// Returns a new builder.
pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> ShardBuilder {
let dedup = config.dedup;
ShardBuilder {
dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard),
data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP),
data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, dedup),
data_freeze_threshold: config.data_freeze_threshold,
dedup,
}
}
@@ -87,10 +90,11 @@ impl ShardBuilder {
};
// build data parts.
let data_parts = DataParts::new(metadata, DATA_INIT_CAP).with_frozen(vec![data_part]);
let data_parts =
DataParts::new(metadata, DATA_INIT_CAP, self.dedup).with_frozen(vec![data_part]);
let key_dict = key_dict.map(Arc::new);
Ok(Some(Shard::new(shard_id, key_dict, data_parts)))
Ok(Some(Shard::new(shard_id, key_dict, data_parts, self.dedup)))
}
/// Scans the shard builder.
@@ -165,9 +169,9 @@ mod tests {
}
let dict = dict_builder.finish().unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP);
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);
Shard::new(shard_id, Some(Arc::new(dict)), data_parts)
Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true)
}
#[test]

View File

@@ -20,6 +20,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{Row, Rows, SemanticType};
use datatypes::arrow::array::UInt64Array;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
@@ -29,6 +30,7 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef,
MemtableStats,
@@ -177,6 +179,46 @@ pub(crate) fn build_key_values(
)
}
pub(crate) fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);
for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}
/// Extracts pk index, timestamps and sequences from [DataBatch].
pub(crate) fn extract_data_batch(batch: &DataBatch) -> (u16, Vec<(i64, u64)>) {
let rb = batch.slice_record_batch();
let ts = timestamp_array_to_i64_slice(rb.column(1));
let seq = rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
let ts_and_seq = ts
.iter()
.zip(seq.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
(batch.pk_index(), ts_and_seq)
}
/// Builds key values with timestamps (ms) and sequences for test.
pub(crate) fn build_key_values_with_ts_seq_values(
schema: &RegionMetadataRef,