feat: support append-only mode in time-series memtable (#3540)

* feat: support append-only mode in time-series memtable

* fix: rename sort_and_dedup to sort
This commit is contained in:
Lei, HUANG
2024-03-20 04:37:54 +08:00
committed by GitHub
parent 5b315c2d40
commit ddbcff68dd
3 changed files with 119 additions and 44 deletions

View File

@@ -51,7 +51,7 @@ fn write_rows(c: &mut Criterion) {
});
});
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None);
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
b.iter(|| {
@@ -83,7 +83,7 @@ fn full_scan(c: &mut Criterion) {
});
});
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None);
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}
@@ -121,7 +121,7 @@ fn filter_1_host(c: &mut Criterion) {
});
});
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None);
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}

View File

@@ -71,6 +71,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder {
metadata.clone(),
id,
self.write_buffer_manager.clone(),
true, // todo(hl): set according to region option
))
}
}
@@ -84,6 +85,7 @@ pub struct TimeSeriesMemtable {
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
dedup: bool,
}
impl TimeSeriesMemtable {
@@ -91,6 +93,7 @@ impl TimeSeriesMemtable {
region_metadata: RegionMetadataRef,
id: MemtableId,
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
) -> Self {
let row_codec = Arc::new(McmpRowCodec::new(
region_metadata
@@ -107,6 +110,7 @@ impl TimeSeriesMemtable {
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
dedup,
}
}
@@ -232,7 +236,7 @@ impl Memtable for TimeSeriesMemtable {
.collect()
};
let iter = self.series_set.iter_series(projection, filters);
let iter = self.series_set.iter_series(projection, filters, self.dedup);
Ok(Box::new(iter))
}
@@ -277,6 +281,7 @@ impl Memtable for TimeSeriesMemtable {
metadata.clone(),
id,
self.alloc_tracker.write_buffer_manager(),
self.dedup,
))
}
}
@@ -336,7 +341,12 @@ impl SeriesSet {
}
/// Iterates all series in [SeriesSet].
fn iter_series(&self, projection: HashSet<ColumnId>, predicate: Option<Predicate>) -> Iter {
fn iter_series(
&self,
projection: HashSet<ColumnId>,
predicate: Option<Predicate>,
dedup: bool,
) -> Iter {
let primary_key_schema = primary_key_schema(&self.region_metadata);
let primary_key_datatypes = self
.region_metadata
@@ -352,6 +362,7 @@ impl SeriesSet {
primary_key_schema,
primary_key_datatypes,
self.codec.clone(),
dedup,
)
}
}
@@ -398,10 +409,12 @@ struct Iter {
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<McmpRowCodec>,
dedup: bool,
metrics: Metrics,
}
impl Iter {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
@@ -410,6 +423,7 @@ impl Iter {
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<McmpRowCodec>,
dedup: bool,
) -> Self {
let simple_filters = predicate
.map(|p| {
@@ -428,6 +442,7 @@ impl Iter {
pk_schema,
pk_datatypes,
codec,
dedup,
metrics: Metrics::default(),
}
}
@@ -484,8 +499,9 @@ impl Iterator for Iter {
self.last_key = Some(primary_key.clone());
let values = series.compact(&self.metadata);
let batch =
values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection));
let batch = values.and_then(|v| {
v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
});
// Update metrics.
self.metrics.num_batches += 1;
@@ -703,6 +719,7 @@ impl Values {
primary_key: &[u8],
metadata: &RegionMetadataRef,
projection: &HashSet<ColumnId>,
dedup: bool,
) -> Result<Batch> {
let builder = BatchBuilder::with_required_columns(
primary_key.to_vec(),
@@ -723,7 +740,7 @@ impl Values {
.collect();
let mut batch = builder.with_fields(fields).build()?;
batch.sort_and_dedup()?;
batch.sort(dedup)?;
Ok(batch)
}
@@ -796,7 +813,7 @@ impl From<ValueBuilder> for Values {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
@@ -983,7 +1000,12 @@ mod tests {
};
let batch = values
.to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect())
.to_batch(
b"test",
&schema,
&[0, 1, 2, 3, 4].into_iter().collect(),
true,
)
.unwrap();
check_value(
&batch,
@@ -1147,18 +1169,29 @@ mod tests {
#[test]
fn test_memtable() {
common_telemetry::init_default_ut_logging();
check_memtable_dedup(true);
check_memtable_dedup(false);
}
fn check_memtable_dedup(dedup: bool) {
let schema = schema_for_test();
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
let memtable = TimeSeriesMemtable::new(schema, 42, None);
let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup);
memtable.write(&kvs).unwrap();
memtable.write(&kvs).unwrap();
let expected_ts = kvs
let mut expected_ts: HashMap<i64, usize> = HashMap::new();
for ts in kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<HashSet<_>>();
{
*expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
}
let iter = memtable.iter(None, None).unwrap();
let read = iter
let mut read = HashMap::new();
for ts in iter
.flat_map(|batch| {
batch
.unwrap()
@@ -1171,7 +1204,9 @@ mod tests {
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<HashSet<_>>();
{
*read.entry(ts).or_default() += 1;
}
assert_eq!(expected_ts, read);
let stats = memtable.stats();
@@ -1190,7 +1225,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let schema = schema_for_test();
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
let memtable = TimeSeriesMemtable::new(schema, 42, None);
let memtable = TimeSeriesMemtable::new(schema, 42, None, true);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), None).unwrap();

View File

@@ -334,12 +334,13 @@ impl Batch {
Ok(())
}
/// Sorts and dedup rows in the batch.
/// Sorts rows in the batch. If `dedup` is true, it also removes
/// duplicated rows according to primary keys.
///
/// It orders rows by timestamp, sequence desc and only keep the latest
/// row for the same timestamp. It doesn't consider op type as sequence
/// should already provide uniqueness for a row.
pub fn sort_and_dedup(&mut self) -> Result<()> {
pub fn sort(&mut self, dedup: bool) -> Result<()> {
// If building a converter each time is costly, we may allow passing a
// converter.
let converter = RowConverter::new(vec![
@@ -362,14 +363,16 @@ impl Batch {
let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
to_sort.sort_unstable_by(|left, right| left.1.cmp(&right.1));
// Dedup by timestamps.
to_sort.dedup_by(|left, right| {
debug_assert_eq!(18, left.1.as_ref().len());
debug_assert_eq!(18, right.1.as_ref().len());
let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
// We only compare the timestamp part and ignore sequence.
left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
});
if dedup {
// Dedup by timestamps.
to_sort.dedup_by(|left, right| {
debug_assert_eq!(18, left.1.as_ref().len());
debug_assert_eq!(18, right.1.as_ref().len());
let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
// We only compare the timestamp part and ignore sequence.
left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
});
}
let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
self.take_in_place(&indices)
@@ -983,7 +986,7 @@ mod tests {
#[test]
fn test_sort_and_dedup() {
let mut batch = new_batch(
let original = new_batch(
&[2, 3, 1, 4, 5, 2],
&[1, 2, 3, 4, 5, 6],
&[
@@ -996,30 +999,67 @@ mod tests {
],
&[21, 22, 23, 24, 25, 26],
);
batch.sort_and_dedup().unwrap();
// It should only keep one timestamp 2.
let expect = new_batch(
&[1, 2, 3, 4, 5],
&[3, 6, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 22, 24, 25],
);
assert_eq!(expect, batch);
let mut batch = new_batch(
let mut batch = original.clone();
batch.sort(true).unwrap();
// It should only keep one timestamp 2.
assert_eq!(
new_batch(
&[1, 2, 3, 4, 5],
&[3, 6, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 22, 24, 25],
),
batch
);
let mut batch = original.clone();
batch.sort(false).unwrap();
// It should only keep one timestamp 2.
assert_eq!(
new_batch(
&[1, 2, 2, 3, 4, 5],
&[3, 6, 1, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 21, 22, 24, 25],
),
batch
);
let original = new_batch(
&[2, 2, 1],
&[1, 6, 1],
&[OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23],
);
batch.sort_and_dedup().unwrap();
let mut batch = original.clone();
batch.sort(true).unwrap();
let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
assert_eq!(expect, batch);
let mut batch = original.clone();
batch.sort(false).unwrap();
let expect = new_batch(
&[1, 2, 2],
&[1, 6, 1],
&[OpType::Put, OpType::Put, OpType::Delete],
&[23, 22, 21],
);
assert_eq!(expect, batch);
}
}