test: test mito filter delete

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-10-29 12:03:27 +08:00
parent 98752f4b47
commit 6e430b3898
2 changed files with 112 additions and 1 deletions

View File

@@ -22,8 +22,10 @@ use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::batch_util::sort_batches_and_print;
use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, rows_schema,
CreateRequestBuilder, TestEnv, build_delete_rows, build_rows, delete_rows, delete_rows_schema,
flush_region, put_rows, rows_schema,
};
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str, flat_format: bool) {
@@ -377,3 +379,99 @@ async fn test_mem_range_prune_with_format(flat_format: bool) {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_scan_filter_field_after_delete() {
test_scan_filter_field_after_delete_with_format(false).await;
test_scan_filter_field_after_delete_with_format(true).await;
}
async fn test_scan_filter_field_after_delete_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request.clone()))
.await
.unwrap();
// put 1, 2, 3, 4 and flush
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas,
rows: build_rows(1, 5),
},
)
.await;
flush_region(&engine, region_id, None).await;
// delete 2, 3
let delete_schemas = delete_rows_schema(&request);
delete_rows(
&engine,
region_id,
Rows {
schema: delete_schemas,
rows: build_delete_rows(2, 4),
},
)
.await;
// Scans and filter fields, the field should be deleted.
let request = ScanRequest {
filters: vec![col("field_0").eq(lit(3.0f64))],
..Default::default()
};
let stream = engine
.scan_to_stream(region_id, request.clone())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+----+
| tag_0 | field_0 | ts |
+-------+---------+----+
+-------+---------+----+";
assert_eq!(
expected,
sort_batches_and_print(&batches, &["tag_0", "field_0", "ts"])
);
// flush delete op
flush_region(&engine, region_id, None).await;
let stream = engine
.scan_to_stream(region_id, request.clone())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
expected,
sort_batches_and_print(&batches, &["tag_0", "field_0", "ts"])
);
}

View File

@@ -1068,6 +1068,19 @@ pub fn build_rows(start: usize, end: usize) -> Vec<Row> {
.collect()
}
/// Build rows with schema (string, ts_millis) in range `[start, end)`.
/// `start`, `end` are in second resolution.
pub fn build_delete_rows(start: usize, end: usize) -> Vec<Row> {
(start..end)
.map(|i| {
row(vec![
ValueData::StringValue(i.to_string()),
ValueData::TimestampMillisecondValue(i as i64 * 1000),
])
})
.collect()
}
/// Build rows with schema (string, f64, f64, ts_millis).
/// - `key`: A string key that is common across all rows.
/// - `timestamps`: Array of timestamp values.