From 395632c874a84a2801da28553c4f83ef27932508 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 3 Nov 2023 16:48:21 +0800 Subject: [PATCH] feat(mito): filters memtables by their time ranges (#2686) * feat: filter memtable by time range * fix: incorrect time range returned by time series memtable * test: test memtable pruning --- src/mito2/src/engine/prune_test.rs | 77 +++++++++++++++++++++++++++ src/mito2/src/memtable/time_series.rs | 12 ++++- src/mito2/src/read/scan_region.rs | 16 +++++- 3 files changed, 102 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 77f0d37dcb..17c213dfcf 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -136,3 +136,80 @@ async fn test_prune_tag_and_field() { ) .await; } + +/// Creates a time range `[start_sec, end_sec)` +fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr { + Expr::from( + col("ts") + .gt_eq(lit(ScalarValue::TimestampMillisecond( + Some(start_sec * 1000), + None, + ))) + .and(col("ts").lt(lit(ScalarValue::TimestampMillisecond( + Some(end_sec * 1000), + None, + )))), + ) +} + +#[tokio::test] +async fn test_prune_memtable() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // 5 ~ 10 in SST + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: build_rows(5, 10), + }, + ) + .await; + flush_region(&engine, region_id, Some(5)).await; + + // 20 ~ 30 in memtable + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: build_rows(20, 30), + }, + ) + .await; + + let stream = engine + .handle_query( + region_id, + ScanRequest { + filters: vec![time_range_expr(0, 20)], + ..Default::default() + }, + ) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | +| 9 | 9.0 | 1970-01-01T00:00:09 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index bd8f8a18b0..bae772db9d 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -258,7 +258,7 @@ impl Memtable for TimeSeriesMemtable { let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed)); MemtableStats { estimated_bytes, - time_range: Some((max_timestamp, min_timestamp)), + time_range: Some((min_timestamp, max_timestamp)), } } } @@ -1047,6 +1047,16 @@ mod tests { .map(|v| v.unwrap().0.value()) .collect::>(); assert_eq!(expected_ts, read); + + let stats = memtable.stats(); + assert!(stats.bytes_allocated() > 0); + assert_eq!( + Some(( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(99) + )), + stats.time_range() + ); } #[test] diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 67353c90dd..2238573248 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -157,10 +157,22 @@ impl ScanRegion { } let memtables = self.version.memtables.list_memtables(); - // Skip empty memtables. + // Skip empty memtables and memtables out of time range. let memtables: Vec<_> = memtables .into_iter() - .filter(|mem| !mem.is_empty()) + .filter(|mem| { + if mem.is_empty() { + return false; + } + let stats = mem.stats(); + let Some((start, end)) = stats.time_range() else { + return true; + }; + + // The time range of the memtable is inclusive. + let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end)); + memtable_range.intersects(&time_range) + }) .collect(); debug!(