From ddbcff68dddb80ac3cb9c4d2069d7ecd39a653fc Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 20 Mar 2024 04:37:54 +0800 Subject: [PATCH] feat: support append-only mode in time-series memtable (#3540) * feat: support append-only mode in time-series memtable * fix: rename sort_and_dedup to sort --- src/mito2/benches/memtable_bench.rs | 6 +- src/mito2/src/memtable/time_series.rs | 61 +++++++++++++---- src/mito2/src/read.rs | 96 +++++++++++++++++++-------- 3 files changed, 119 insertions(+), 44 deletions(-) diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 30701fe2ba..e20c44b424 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -51,7 +51,7 @@ fn write_rows(c: &mut Criterion) { }); }); group.bench_function("time_series", |b| { - let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None); + let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); b.iter(|| { @@ -83,7 +83,7 @@ fn full_scan(c: &mut Criterion) { }); }); group.bench_function("time_series", |b| { - let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None); + let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true); for kvs in generator.iter() { memtable.write(&kvs).unwrap(); } @@ -121,7 +121,7 @@ fn filter_1_host(c: &mut Criterion) { }); }); group.bench_function("time_series", |b| { - let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None); + let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true); for kvs in generator.iter() { memtable.write(&kvs).unwrap(); } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index c31f9bea7b..5403a8fea2 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -71,6 +71,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder { metadata.clone(), id, self.write_buffer_manager.clone(), + true, // todo(hl): set according to region option )) } } @@ -84,6 +85,7 @@ pub struct TimeSeriesMemtable { alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, + dedup: bool, } impl TimeSeriesMemtable { @@ -91,6 +93,7 @@ impl TimeSeriesMemtable { region_metadata: RegionMetadataRef, id: MemtableId, write_buffer_manager: Option, + dedup: bool, ) -> Self { let row_codec = Arc::new(McmpRowCodec::new( region_metadata @@ -107,6 +110,7 @@ impl TimeSeriesMemtable { alloc_tracker: AllocTracker::new(write_buffer_manager), max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), + dedup, } } @@ -232,7 +236,7 @@ impl Memtable for TimeSeriesMemtable { .collect() }; - let iter = self.series_set.iter_series(projection, filters); + let iter = self.series_set.iter_series(projection, filters, self.dedup); Ok(Box::new(iter)) } @@ -277,6 +281,7 @@ impl Memtable for TimeSeriesMemtable { metadata.clone(), id, self.alloc_tracker.write_buffer_manager(), + self.dedup, )) } } @@ -336,7 +341,12 @@ impl SeriesSet { } /// Iterates all series in [SeriesSet]. - fn iter_series(&self, projection: HashSet, predicate: Option) -> Iter { + fn iter_series( + &self, + projection: HashSet, + predicate: Option, + dedup: bool, + ) -> Iter { let primary_key_schema = primary_key_schema(&self.region_metadata); let primary_key_datatypes = self .region_metadata @@ -352,6 +362,7 @@ impl SeriesSet { primary_key_schema, primary_key_datatypes, self.codec.clone(), + dedup, ) } } @@ -398,10 +409,12 @@ struct Iter { pk_schema: arrow::datatypes::SchemaRef, pk_datatypes: Vec, codec: Arc, + dedup: bool, metrics: Metrics, } impl Iter { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( metadata: RegionMetadataRef, series: Arc, @@ -410,6 +423,7 @@ impl Iter { pk_schema: arrow::datatypes::SchemaRef, pk_datatypes: Vec, codec: Arc, + dedup: bool, ) -> Self { let simple_filters = predicate .map(|p| { @@ -428,6 +442,7 @@ impl Iter { pk_schema, pk_datatypes, codec, + dedup, metrics: Metrics::default(), } } @@ -484,8 +499,9 @@ impl Iterator for Iter { self.last_key = Some(primary_key.clone()); let values = series.compact(&self.metadata); - let batch = - values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)); + let batch = values.and_then(|v| { + v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup) + }); // Update metrics. self.metrics.num_batches += 1; @@ -703,6 +719,7 @@ impl Values { primary_key: &[u8], metadata: &RegionMetadataRef, projection: &HashSet, + dedup: bool, ) -> Result { let builder = BatchBuilder::with_required_columns( primary_key.to_vec(), @@ -723,7 +740,7 @@ impl Values { .collect(); let mut batch = builder.with_fields(fields).build()?; - batch.sort_and_dedup()?; + batch.sort(dedup)?; Ok(batch) } @@ -796,7 +813,7 @@ impl From for Values { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; @@ -983,7 +1000,12 @@ mod tests { }; let batch = values - .to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect()) + .to_batch( + b"test", + &schema, + &[0, 1, 2, 3, 4].into_iter().collect(), + true, + ) .unwrap(); check_value( &batch, @@ -1147,18 +1169,29 @@ mod tests { #[test] fn test_memtable() { common_telemetry::init_default_ut_logging(); + check_memtable_dedup(true); + check_memtable_dedup(false); + } + + fn check_memtable_dedup(dedup: bool) { let schema = schema_for_test(); let kvs = build_key_values(&schema, "hello".to_string(), 42, 100); - let memtable = TimeSeriesMemtable::new(schema, 42, None); + let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup); + memtable.write(&kvs).unwrap(); memtable.write(&kvs).unwrap(); - let expected_ts = kvs + let mut expected_ts: HashMap = HashMap::new(); + for ts in kvs .iter() .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value()) - .collect::>(); + { + *expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 }; + } let iter = memtable.iter(None, None).unwrap(); - let read = iter + let mut read = HashMap::new(); + + for ts in iter .flat_map(|batch| { batch .unwrap() @@ -1171,7 +1204,9 @@ mod tests { .into_iter() }) .map(|v| v.unwrap().0.value()) - .collect::>(); + { + *read.entry(ts).or_default() += 1; + } assert_eq!(expected_ts, read); let stats = memtable.stats(); @@ -1190,7 +1225,7 @@ mod tests { common_telemetry::init_default_ut_logging(); let schema = schema_for_test(); let kvs = build_key_values(&schema, "hello".to_string(), 42, 100); - let memtable = TimeSeriesMemtable::new(schema, 42, None); + let memtable = TimeSeriesMemtable::new(schema, 42, None, true); memtable.write(&kvs).unwrap(); let iter = memtable.iter(Some(&[3]), None).unwrap(); diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index a4b3d8dbe2..9a9820b294 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -334,12 +334,13 @@ impl Batch { Ok(()) } - /// Sorts and dedup rows in the batch. + /// Sorts rows in the batch. If `dedup` is true, it also removes + /// duplicated rows according to primary keys. /// /// It orders rows by timestamp, sequence desc and only keep the latest /// row for the same timestamp. It doesn't consider op type as sequence /// should already provide uniqueness for a row. - pub fn sort_and_dedup(&mut self) -> Result<()> { + pub fn sort(&mut self, dedup: bool) -> Result<()> { // If building a converter each time is costly, we may allow passing a // converter. let converter = RowConverter::new(vec![ @@ -362,14 +363,16 @@ impl Batch { let mut to_sort: Vec<_> = rows.iter().enumerate().collect(); to_sort.sort_unstable_by(|left, right| left.1.cmp(&right.1)); - // Dedup by timestamps. - to_sort.dedup_by(|left, right| { - debug_assert_eq!(18, left.1.as_ref().len()); - debug_assert_eq!(18, right.1.as_ref().len()); - let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref()); - // We only compare the timestamp part and ignore sequence. - left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN] - }); + if dedup { + // Dedup by timestamps. + to_sort.dedup_by(|left, right| { + debug_assert_eq!(18, left.1.as_ref().len()); + debug_assert_eq!(18, right.1.as_ref().len()); + let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref()); + // We only compare the timestamp part and ignore sequence. + left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN] + }); + } let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32)); self.take_in_place(&indices) @@ -983,7 +986,7 @@ mod tests { #[test] fn test_sort_and_dedup() { - let mut batch = new_batch( + let original = new_batch( &[2, 3, 1, 4, 5, 2], &[1, 2, 3, 4, 5, 6], &[ @@ -996,30 +999,67 @@ mod tests { ], &[21, 22, 23, 24, 25, 26], ); - batch.sort_and_dedup().unwrap(); - // It should only keep one timestamp 2. - let expect = new_batch( - &[1, 2, 3, 4, 5], - &[3, 6, 2, 4, 5], - &[ - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - OpType::Put, - ], - &[23, 26, 22, 24, 25], - ); - assert_eq!(expect, batch); - let mut batch = new_batch( + let mut batch = original.clone(); + batch.sort(true).unwrap(); + // It should only keep one timestamp 2. + assert_eq!( + new_batch( + &[1, 2, 3, 4, 5], + &[3, 6, 2, 4, 5], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[23, 26, 22, 24, 25], + ), + batch + ); + + let mut batch = original.clone(); + batch.sort(false).unwrap(); + + // It should only keep one timestamp 2. + assert_eq!( + new_batch( + &[1, 2, 2, 3, 4, 5], + &[3, 6, 1, 2, 4, 5], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[23, 26, 21, 22, 24, 25], + ), + batch + ); + + let original = new_batch( &[2, 2, 1], &[1, 6, 1], &[OpType::Delete, OpType::Put, OpType::Put], &[21, 22, 23], ); - batch.sort_and_dedup().unwrap(); + + let mut batch = original.clone(); + batch.sort(true).unwrap(); let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]); assert_eq!(expect, batch); + + let mut batch = original.clone(); + batch.sort(false).unwrap(); + let expect = new_batch( + &[1, 2, 2], + &[1, 6, 1], + &[OpType::Put, OpType::Put, OpType::Delete], + &[23, 22, 21], + ); + assert_eq!(expect, batch); } }