diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index dd0511b1ec..d075a9deb3 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -14,7 +14,8 @@ //! Memtable implementation based on a merge tree. -mod data; +pub(crate) mod data; +mod dedup; mod dict; mod merger; mod metrics; @@ -59,6 +60,8 @@ pub struct MergeTreeConfig { pub index_max_keys_per_shard: usize, /// Number of rows to freeze a data part. pub data_freeze_threshold: usize, + /// Whether to delete duplicates rows. + pub dedup: bool, } impl Default for MergeTreeConfig { @@ -66,6 +69,7 @@ impl Default for MergeTreeConfig { Self { index_max_keys_per_shard: 8192, data_freeze_threshold: 102400, + dedup: true, } } } diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 9497791d2d..418784fc71 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -63,11 +63,11 @@ pub(crate) struct DataBatchRange { impl DataBatchRange { pub(crate) fn len(&self) -> usize { - (self.start..self.end).len() + self.end - self.start } pub(crate) fn is_empty(&self) -> bool { - (self.start..self.end).is_empty() + self.len() == 0 } } @@ -163,6 +163,10 @@ impl<'a> DataBatch<'a> { }, } } + + pub(crate) fn num_rows(&self) -> usize { + self.range.len() + } } /// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard. @@ -180,11 +184,13 @@ pub struct DataBuffer { op_type_builder: UInt8VectorBuilder, /// Builders for field columns. field_builders: Vec, + + dedup: bool, } impl DataBuffer { /// Creates a `DataBuffer` instance with given schema and capacity. - pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize) -> Self { + pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize, dedup: bool) -> Self { let ts_builder = metadata .time_index_column() .column_schema @@ -209,6 +215,7 @@ impl DataBuffer { sequence_builder, op_type_builder, field_builders, + dedup, } } @@ -237,7 +244,13 @@ impl DataBuffer { pk_weights: Option<&[u16]>, replace_pk_index: bool, ) -> Result { - let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None, replace_pk_index); + let encoder = DataPartEncoder::new( + &self.metadata, + pk_weights, + None, + replace_pk_index, + self.dedup, + ); let parts = encoder.write(self)?; Ok(parts) } @@ -246,13 +259,12 @@ impl DataBuffer { /// If pk_weights is present, yielded rows are sorted according to weights, /// otherwise rows are sorted by "pk_weights" values as they are actually weights. pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result { - // todo(hl): control whether to dedup while invoking `read`. let batch = data_buffer_to_record_batches( self.data_part_schema.clone(), self, pk_weights, true, - true, + self.dedup, // replace_pk_index is always set to false since: // - for DataBuffer in ShardBuilder, pk dict is not frozen // - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`. @@ -629,6 +641,7 @@ struct DataPartEncoder<'a> { pk_weights: Option<&'a [u16]>, row_group_size: Option, replace_pk_index: bool, + dedup: bool, } impl<'a> DataPartEncoder<'a> { @@ -637,6 +650,7 @@ impl<'a> DataPartEncoder<'a> { pk_weights: Option<&'a [u16]>, row_group_size: Option, replace_pk_index: bool, + dedup: bool, ) -> DataPartEncoder<'a> { let schema = memtable_schema_to_encoded_schema(metadata); Self { @@ -644,6 +658,7 @@ impl<'a> DataPartEncoder<'a> { pk_weights, row_group_size, replace_pk_index, + dedup, } } @@ -663,7 +678,7 @@ impl<'a> DataPartEncoder<'a> { source, self.pk_weights, false, - true, + self.dedup, self.replace_pk_index, )?; writer.write(&rb).context(error::EncodeMemtableSnafu)?; @@ -803,9 +818,9 @@ pub struct DataParts { } impl DataParts { - pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize) -> Self { + pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize, dedup: bool) -> Self { Self { - active: DataBuffer::with_capacity(metadata, capacity), + active: DataBuffer::with_capacity(metadata, capacity, dedup), frozen: Vec::new(), } } @@ -868,6 +883,29 @@ impl DataPartsReader { } } +#[cfg(test)] +pub(crate) fn write_rows_to_buffer( + buffer: &mut DataBuffer, + schema: &RegionMetadataRef, + pk_index: u16, + ts: Vec, + v0: Vec>, + sequence: u64, +) { + let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values( + schema, + "whatever".to_string(), + 1, + ts.into_iter(), + v0.into_iter(), + sequence, + ); + + for kv in kvs.iter() { + buffer.write_row(pk_index, kv); + } +} + #[cfg(test)] mod tests { use datafusion::arrow::array::Float64Array; @@ -876,7 +914,7 @@ mod tests { use parquet::data_type::AsBytes; use super::*; - use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; + use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test}; #[test] fn test_lazy_mutable_vector_builder() { @@ -900,7 +938,7 @@ mod tests { fn check_test_data_buffer_to_record_batches(keep_data: bool) { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2); @@ -968,10 +1006,50 @@ mod tests { check_test_data_buffer_to_record_batches(false); } + fn check_data_buffer_dedup(dedup: bool) { + let metadata = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup); + write_rows_to_buffer( + &mut buffer, + &metadata, + 0, + vec![2, 3], + vec![Some(1.0), Some(2.0)], + 0, + ); + write_rows_to_buffer( + &mut buffer, + &metadata, + 0, + vec![1, 2], + vec![Some(1.1), Some(2.1)], + 2, + ); + + let mut reader = buffer.read(Some(&[0])).unwrap(); + let mut res = vec![]; + while reader.is_valid() { + let batch = reader.current_data_batch(); + res.push(extract_data_batch(&batch)); + reader.next().unwrap(); + } + if dedup { + assert_eq!(vec![(0, vec![(1, 2), (2, 3), (3, 1)])], res); + } else { + assert_eq!(vec![(0, vec![(1, 2), (2, 3), (2, 0), (3, 1)])], res); + } + } + + #[test] + fn test_data_buffer_dedup() { + check_data_buffer_dedup(true); + check_data_buffer_dedup(false); + } + #[test] fn test_data_buffer_to_record_batches_with_dedup() { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); write_rows_to_buffer(&mut buffer, &meta, 1, vec![2], vec![Some(1.1)], 2); @@ -1026,7 +1104,7 @@ mod tests { #[test] fn test_data_buffer_to_record_batches_without_dedup() { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1); write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2); @@ -1064,35 +1142,13 @@ mod tests { ); } - fn write_rows_to_buffer( - buffer: &mut DataBuffer, - schema: &RegionMetadataRef, - pk_index: u16, - ts: Vec, - v0: Vec>, - sequence: u64, - ) { - let kvs = build_key_values_with_ts_seq_values( - schema, - "whatever".to_string(), - 1, - ts.into_iter(), - v0.into_iter(), - sequence, - ); - - for kv in kvs.iter() { - buffer.write_row(pk_index, kv); - } - } - fn check_data_buffer_freeze( pk_weights: Option<&[u16]>, replace_pk_weights: bool, expected: &[(u16, Vec<(i64, u64)>)], ) { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); // write rows with null values. write_rows_to_buffer( @@ -1113,21 +1169,7 @@ mod tests { .unwrap(); while reader.is_valid() { let batch = reader.current_data_batch(); - let rb = batch.slice_record_batch(); - let ts = timestamp_array_to_i64_slice(rb.column(1)); - let sequence = rb - .column(2) - .as_any() - .downcast_ref::() - .unwrap() - .values(); - let ts_and_seq = ts - .iter() - .zip(sequence.iter()) - .map(|(ts, seq)| (*ts, *seq)) - .collect::>(); - res.push((batch.pk_index(), ts_and_seq)); - + res.push(extract_data_batch(&batch)); reader.next().unwrap(); } assert_eq!(expected, res); @@ -1163,7 +1205,7 @@ mod tests { #[test] fn test_encode_data_buffer() { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); // write rows with null values. write_rows_to_buffer( @@ -1181,7 +1223,7 @@ mod tests { assert_eq!(4, buffer.num_rows()); - let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true); + let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true, true); let encoded = match encoder.write(&mut buffer).unwrap() { DataPart::Parquet(data) => data.data, }; @@ -1228,7 +1270,7 @@ mod tests { fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec]) { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); write_rows_to_buffer( &mut buffer, @@ -1268,7 +1310,7 @@ mod tests { #[test] fn test_iter_empty_data_buffer() { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap(); check_buffer_values_equal(&mut iter, &[]); } @@ -1294,7 +1336,7 @@ mod tests { fn check_iter_data_part(weights: &[u16], expected_values: &[Vec]) { let meta = metadata_for_test(); - let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); write_rows_to_buffer( &mut buffer, @@ -1323,7 +1365,7 @@ mod tests { 4, ); - let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true); + let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true, true); let encoded = encoder.write(&mut buffer).unwrap(); let mut iter = encoded.read().unwrap(); diff --git a/src/mito2/src/memtable/merge_tree/dedup.rs b/src/mito2/src/memtable/merge_tree/dedup.rs new file mode 100644 index 0000000000..889db134de --- /dev/null +++ b/src/mito2/src/memtable/merge_tree/dedup.rs @@ -0,0 +1,235 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; + +use crate::error::Result; +use crate::memtable::merge_tree::data::DataBatch; +use crate::memtable::merge_tree::PkId; + +pub trait DedupSource { + /// Returns whether current source is still valid. + fn is_valid(&self) -> bool; + + /// Advances source to next data batch. + fn next(&mut self) -> Result<()>; + + /// Returns current pk id. + /// # Panics + /// If source is not valid. + fn current_pk_id(&self) -> PkId; + + /// Returns the current primary key bytes. + /// # Panics + /// If source is not valid. + fn current_key(&self) -> &[u8]; + + /// Returns the data part. + /// # Panics + /// If source is not valid. + fn current_data_batch(&self) -> DataBatch; +} + +struct DedupReader { + prev_batch_last_row: Option<(PkId, i64)>, + current_batch_range: Option>, + inner: T, +} + +impl DedupReader { + fn try_new(inner: T) -> Result { + let mut res = Self { + prev_batch_last_row: None, + current_batch_range: None, + inner, + }; + res.next()?; + Ok(res) + } + + fn is_valid(&self) -> bool { + self.current_batch_range.is_some() + } + + /// Returns current encoded primary key. + /// # Panics + /// If inner reader is exhausted. + fn current_key(&self) -> &[u8] { + self.inner.current_key() + } + + fn current_data_batch(&self) -> DataBatch { + let range = self.current_batch_range.as_ref().unwrap(); + let data_batch = self.inner.current_data_batch(); + data_batch.slice(range.start, range.len()) + } + + fn next(&mut self) -> Result<()> { + loop { + match &mut self.prev_batch_last_row { + None => { + // First shot, fill prev_batch_last_row and current_batch_range with first batch. + let current_batch = self.inner.current_data_batch(); + let pk_id = self.inner.current_pk_id(); + let (last_ts, _) = current_batch.last_row(); + self.prev_batch_last_row = Some((pk_id, last_ts)); + self.current_batch_range = Some(0..current_batch.num_rows()); + break; + } + Some(prev_last_row) => { + self.inner.next()?; + if !self.inner.is_valid() { + // Resets current_batch_range if inner reader is exhausted. + self.current_batch_range = None; + break; + } + let current_batch = self.inner.current_data_batch(); + let current_pk_id = self.inner.current_pk_id(); + let (first_ts, _) = current_batch.first_row(); + let rows_in_batch = current_batch.num_rows(); + + let (start, end) = if &(current_pk_id, first_ts) == prev_last_row { + // First row in this batch duplicated with the last row in previous batch + if rows_in_batch == 1 { + // If batch is exhausted, move to next batch. + continue; + } else { + // Skip the first row, start from offset 1. + (1, rows_in_batch) + } + } else { + // No duplicates found, yield whole batch. + (0, rows_in_batch) + }; + + let (last_ts, _) = current_batch.last_row(); + *prev_last_row = (current_pk_id, last_ts); + self.current_batch_range = Some(start..end); + break; + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use store_api::metadata::RegionMetadataRef; + + use super::*; + use crate::memtable::merge_tree::data::{ + write_rows_to_buffer, DataBuffer, DataParts, DataPartsReader, + }; + use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test}; + + impl DedupSource for DataPartsReader { + fn is_valid(&self) -> bool { + self.is_valid() + } + + fn next(&mut self) -> Result<()> { + self.next() + } + + fn current_pk_id(&self) -> PkId { + PkId { + shard_id: 0, + pk_index: self.current_data_batch().pk_index(), + } + } + + fn current_key(&self) -> &[u8] { + b"abcf" + } + + fn current_data_batch(&self) -> DataBatch { + self.current_data_batch() + } + } + + fn build_data_buffer( + meta: RegionMetadataRef, + rows: Vec<(u16, Vec)>, + seq: &mut u64, + ) -> DataBuffer { + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true); + + for row in rows { + let (pk_index, timestamps) = row; + let num_rows = timestamps.len() as u64; + let v = timestamps.iter().map(|v| Some(*v as f64)).collect(); + + write_rows_to_buffer(&mut buffer, &meta, pk_index, timestamps, v, *seq); + *seq += num_rows; + } + buffer + } + + fn check_data_parts_reader_dedup( + parts: Vec)>>, + expected: Vec<(u16, Vec<(i64, u64)>)>, + ) { + let meta = metadata_for_test(); + let mut seq = 0; + + let mut frozens = Vec::with_capacity(parts.len()); + for part in parts { + let mut buffer1 = build_data_buffer(meta.clone(), part, &mut seq); + let part1 = buffer1.freeze(None, false).unwrap(); + frozens.push(part1); + } + + let mut parts = DataParts::new(meta, 10, true).with_frozen(frozens); + + let mut res = Vec::with_capacity(expected.len()); + let mut reader = DedupReader::try_new(parts.read().unwrap()).unwrap(); + while reader.is_valid() { + let batch = reader.current_data_batch(); + res.push(extract_data_batch(&batch)); + reader.next().unwrap(); + } + + assert_eq!(expected, res); + } + + #[test] + fn test_data_parts_reader_dedup() { + check_data_parts_reader_dedup(vec![vec![(0, vec![1, 2])]], vec![(0, vec![(1, 0), (2, 1)])]); + + check_data_parts_reader_dedup( + vec![ + vec![(0, vec![1, 2])], + vec![(0, vec![1, 2])], + vec![(0, vec![2, 3])], + ], + vec![(0, vec![(1, 2)]), (0, vec![(2, 4)]), (0, vec![(3, 5)])], + ); + + check_data_parts_reader_dedup( + vec![vec![(0, vec![1])], vec![(0, vec![2])], vec![(0, vec![3])]], + vec![(0, vec![(1, 0)]), (0, vec![(2, 1)]), (0, vec![(3, 2)])], + ); + + check_data_parts_reader_dedup( + vec![vec![(0, vec![1])], vec![(0, vec![1])], vec![(0, vec![1])]], + vec![(0, vec![(1, 2)])], + ); + + check_data_parts_reader_dedup( + vec![vec![(0, vec![1])], vec![(1, vec![1])], vec![(2, vec![1])]], + vec![(0, vec![(1, 0)]), (1, vec![(1, 1)]), (2, vec![(1, 2)])], + ); + } +} diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index a6394ea924..c5012e5ee8 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -357,7 +357,7 @@ mod tests { #[test] fn test_merger() { let metadata = metadata_for_test(); - let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true); let weight = &[2, 1, 0]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq); @@ -366,7 +366,7 @@ mod tests { buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq); write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq); let node2 = DataNode::new(DataSource::Part( @@ -388,7 +388,7 @@ mod tests { #[test] fn test_merger2() { let metadata = metadata_for_test(); - let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true); let weight = &[2, 1, 0]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 1, vec![2, 3], &mut seq); @@ -397,13 +397,13 @@ mod tests { buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![3], &mut seq); let node2 = DataNode::new(DataSource::Part( buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); let node3 = DataNode::new(DataSource::Part( buffer3.freeze(Some(weight), true).unwrap().read().unwrap(), @@ -426,7 +426,7 @@ mod tests { #[test] fn test_merger_overlapping() { let metadata = metadata_for_test(); - let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true); let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); @@ -434,13 +434,13 @@ mod tests { buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); let node2 = DataNode::new(DataSource::Part( buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); let node3 = DataNode::new(DataSource::Part( buffer3.freeze(Some(weight), true).unwrap().read().unwrap(), @@ -462,19 +462,19 @@ mod tests { #[test] fn test_merger_parts_and_buffer() { let metadata = metadata_for_test(); - let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true); let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 3], &mut seq); let node1 = DataNode::new(DataSource::Buffer(buffer1.read(Some(weight)).unwrap())); - let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer2, &metadata, 1, vec![2, 3], &mut seq); let node2 = DataNode::new(DataSource::Part( buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2, 3], &mut seq); let node3 = DataNode::new(DataSource::Part( buffer3.freeze(Some(weight), true).unwrap().read().unwrap(), @@ -496,7 +496,7 @@ mod tests { #[test] fn test_merger_overlapping_2() { let metadata = metadata_for_test(); - let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true); let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![1, 2, 2], &mut seq); @@ -504,13 +504,13 @@ mod tests { buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![2], &mut seq); let node2 = DataNode::new(DataSource::Part( buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer3 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer3, &metadata, 0, vec![2], &mut seq); let node3 = DataNode::new(DataSource::Part( buffer3.freeze(Some(weight), true).unwrap().read().unwrap(), @@ -530,7 +530,7 @@ mod tests { #[test] fn test_merger_overlapping_3() { let metadata = metadata_for_test(); - let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer1 = DataBuffer::with_capacity(metadata.clone(), 10, true); let weight = &[0, 1, 2]; let mut seq = 0; write_rows_to_buffer(&mut buffer1, &metadata, 0, vec![0, 1], &mut seq); @@ -538,7 +538,7 @@ mod tests { buffer1.freeze(Some(weight), true).unwrap().read().unwrap(), )); - let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10); + let mut buffer2 = DataBuffer::with_capacity(metadata.clone(), 10, true); write_rows_to_buffer(&mut buffer2, &metadata, 0, vec![1], &mut seq); let node2 = DataNode::new(DataSource::Part( buffer2.freeze(Some(weight), true).unwrap().read().unwrap(), diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index dc817d134d..89302906b2 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -46,7 +46,7 @@ impl Partition { let shard_builder = ShardBuilder::new(metadata.clone(), config); Partition { - inner: RwLock::new(Inner::new(metadata, shard_builder)), + inner: RwLock::new(Inner::new(metadata, shard_builder, config.dedup)), } } @@ -128,6 +128,7 @@ impl Partition { active_shard_id: inner.active_shard_id, shards, num_rows: 0, + dedup: config.dedup, }), } } @@ -194,21 +195,23 @@ struct Inner { /// Shards with frozen dictionary. shards: Vec, num_rows: usize, + dedup: bool, } impl Inner { - fn new(metadata: RegionMetadataRef, shard_builder: ShardBuilder) -> Self { + fn new(metadata: RegionMetadataRef, shard_builder: ShardBuilder, dedup: bool) -> Self { let mut inner = Self { metadata, shard_builder, active_shard_id: 0, shards: Vec::new(), num_rows: 0, + dedup, }; if inner.metadata.primary_key.is_empty() { - let data_parts = DataParts::new(inner.metadata.clone(), DATA_INIT_CAP); - inner.shards.push(Shard::new(0, None, data_parts)); + let data_parts = DataParts::new(inner.metadata.clone(), DATA_INIT_CAP, dedup); + inner.shards.push(Shard::new(0, None, data_parts, dedup)); inner.active_shard_id = 1; } diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 86c5ea18f1..a9ad6e30b8 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -28,15 +28,22 @@ pub struct Shard { key_dict: Option, /// Data in the shard. data_parts: DataParts, + dedup: bool, } impl Shard { /// Returns a new shard. - pub fn new(shard_id: ShardId, key_dict: Option, data_parts: DataParts) -> Shard { + pub fn new( + shard_id: ShardId, + key_dict: Option, + data_parts: DataParts, + dedup: bool, + ) -> Shard { Shard { shard_id, key_dict, data_parts, + dedup, } } @@ -77,7 +84,8 @@ impl Shard { Shard { shard_id: self.shard_id, key_dict: self.key_dict.clone(), - data_parts: DataParts::new(metadata, DATA_INIT_CAP), + data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup), + dedup: self.dedup, } } } @@ -144,9 +152,9 @@ mod tests { } let dict = dict_builder.finish().unwrap(); - let data_parts = DataParts::new(metadata, DATA_INIT_CAP); + let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true); - Shard::new(shard_id, Some(Arc::new(dict)), data_parts) + Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true) } #[test] diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 96e33ce069..68ebac37a2 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -38,15 +38,18 @@ pub struct ShardBuilder { data_buffer: DataBuffer, /// Number of rows to freeze a data part. data_freeze_threshold: usize, + dedup: bool, } impl ShardBuilder { /// Returns a new builder. pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> ShardBuilder { + let dedup = config.dedup; ShardBuilder { dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard), - data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP), + data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, dedup), data_freeze_threshold: config.data_freeze_threshold, + dedup, } } @@ -87,10 +90,11 @@ impl ShardBuilder { }; // build data parts. - let data_parts = DataParts::new(metadata, DATA_INIT_CAP).with_frozen(vec![data_part]); + let data_parts = + DataParts::new(metadata, DATA_INIT_CAP, self.dedup).with_frozen(vec![data_part]); let key_dict = key_dict.map(Arc::new); - Ok(Some(Shard::new(shard_id, key_dict, data_parts))) + Ok(Some(Shard::new(shard_id, key_dict, data_parts, self.dedup))) } /// Scans the shard builder. @@ -165,9 +169,9 @@ mod tests { } let dict = dict_builder.finish().unwrap(); - let data_parts = DataParts::new(metadata, DATA_INIT_CAP); + let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true); - Shard::new(shard_id, Some(Arc::new(dict)), data_parts) + Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true) } #[test] diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index a640d3c20e..26d110415b 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{Row, Rows, SemanticType}; +use datatypes::arrow::array::UInt64Array; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; @@ -29,6 +30,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::memtable::key_values::KeyValue; +use crate::memtable::merge_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, @@ -177,6 +179,46 @@ pub(crate) fn build_key_values( ) } +pub(crate) fn write_rows_to_buffer( + buffer: &mut DataBuffer, + schema: &RegionMetadataRef, + pk_index: u16, + ts: Vec, + v0: Vec>, + sequence: u64, +) { + let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values( + schema, + "whatever".to_string(), + 1, + ts.into_iter(), + v0.into_iter(), + sequence, + ); + + for kv in kvs.iter() { + buffer.write_row(pk_index, kv); + } +} + +/// Extracts pk index, timestamps and sequences from [DataBatch]. +pub(crate) fn extract_data_batch(batch: &DataBatch) -> (u16, Vec<(i64, u64)>) { + let rb = batch.slice_record_batch(); + let ts = timestamp_array_to_i64_slice(rb.column(1)); + let seq = rb + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let ts_and_seq = ts + .iter() + .zip(seq.iter()) + .map(|(ts, seq)| (*ts, *seq)) + .collect::>(); + (batch.pk_index(), ts_and_seq) +} + /// Builds key values with timestamps (ms) and sequences for test. pub(crate) fn build_key_values_with_ts_seq_values( schema: &RegionMetadataRef,