feat(mito): filters memtables by their time ranges (#2686)

* feat: filter memtable by time range

* fix: incorrect time range returned by time series memtable

* test: test memtable pruning
This commit is contained in:
Yingwen
2023-11-03 16:48:21 +08:00
committed by GitHub
parent 0dca63bc7b
commit 395632c874
3 changed files with 102 additions and 3 deletions

View File

@@ -136,3 +136,80 @@ async fn test_prune_tag_and_field() {
)
.await;
}
/// Creates a time range `[start_sec, end_sec)`
fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr {
Expr::from(
col("ts")
.gt_eq(lit(ScalarValue::TimestampMillisecond(
Some(start_sec * 1000),
None,
)))
.and(col("ts").lt(lit(ScalarValue::TimestampMillisecond(
Some(end_sec * 1000),
None,
)))),
)
}
#[tokio::test]
async fn test_prune_memtable() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// 5 ~ 10 in SST
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(5, 10),
},
)
.await;
flush_region(&engine, region_id, Some(5)).await;
// 20 ~ 30 in memtable
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(20, 30),
},
)
.await;
let stream = engine
.handle_query(
region_id,
ScanRequest {
filters: vec![time_range_expr(0, 20)],
..Default::default()
},
)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -258,7 +258,7 @@ impl Memtable for TimeSeriesMemtable {
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
MemtableStats {
estimated_bytes,
time_range: Some((max_timestamp, min_timestamp)),
time_range: Some((min_timestamp, max_timestamp)),
}
}
}
@@ -1047,6 +1047,16 @@ mod tests {
.map(|v| v.unwrap().0.value())
.collect::<HashSet<_>>();
assert_eq!(expected_ts, read);
let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
assert_eq!(
Some((
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(99)
)),
stats.time_range()
);
}
#[test]

View File

@@ -157,10 +157,22 @@ impl ScanRegion {
}
let memtables = self.version.memtables.list_memtables();
// Skip empty memtables.
// Skip empty memtables and memtables out of time range.
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| !mem.is_empty())
.filter(|mem| {
if mem.is_empty() {
return false;
}
let stats = mem.stats();
let Some((start, end)) = stats.time_range() else {
return true;
};
// The time range of the memtable is inclusive.
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
memtable_range.intersects(&time_range)
})
.collect();
debug!(