mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 04:50:38 +00:00
feat: implement prefilter framework and primary key prefilter (#7862)
* feat: prefilter basic framework Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: move arguments to RowGroupBuildContext Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: skip prefiltered exprs in FlatPruneReader Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: remove unused functions Signed-off-by: evenyag <realevenyag@gmail.com> * chore: update comment Signed-off-by: evenyag <realevenyag@gmail.com> * feat: handle partition columns in prefilter Signed-off-by: evenyag <realevenyag@gmail.com> * chore: fix clippy Signed-off-by: evenyag <realevenyag@gmail.com> * fix: apply prefiltered selection by and_then Signed-off-by: evenyag <realevenyag@gmail.com> * chore: fix clippy Signed-off-by: evenyag <realevenyag@gmail.com> * fix: handle last row cache Signed-off-by: evenyag <realevenyag@gmail.com> * fix: don't ignore error in PrimaryKeyFilter Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -48,7 +48,7 @@ use crate::error::{ArrowComputeSnafu, Result, ToArrowScalarSnafu, UnsupportedOpe
|
||||
///
|
||||
/// This struct contains normalized predicate expr. In the form of
|
||||
/// `col` `op` `literal` where the `col` is provided from input.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SimpleFilterEvaluator {
|
||||
/// Name of the referenced column.
|
||||
column_name: String,
|
||||
|
||||
@@ -246,18 +246,18 @@ fn bench_primary_key_filter(c: &mut Criterion) {
|
||||
|
||||
let dense_pk = encode_dense_pk(&metadata, &row);
|
||||
let dense_codec = DensePrimaryKeyCodec::new(&metadata);
|
||||
let mut dense_fast = dense_codec.primary_key_filter(&metadata, filters.clone());
|
||||
let mut dense_fast = dense_codec.primary_key_filter(&metadata, filters.clone(), false);
|
||||
let mut dense_offsets = Vec::new();
|
||||
|
||||
let sparse_pk = encode_sparse_pk(&metadata, &row);
|
||||
let sparse_codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let mut sparse_fast = sparse_codec.primary_key_filter(&metadata, filters.clone());
|
||||
let mut sparse_fast = sparse_codec.primary_key_filter(&metadata, filters.clone(), false);
|
||||
let mut sparse_offsets = std::collections::HashMap::new();
|
||||
|
||||
let mut group = c.benchmark_group(format!("primary_key_filter/{case_name}"));
|
||||
|
||||
group.bench_function("dense/fast", |b| {
|
||||
b.iter(|| black_box(dense_fast.matches(black_box(&dense_pk))))
|
||||
b.iter(|| black_box(dense_fast.matches(black_box(&dense_pk)).unwrap()))
|
||||
});
|
||||
group.bench_function("dense/scalar", |b| {
|
||||
b.iter(|| {
|
||||
@@ -272,7 +272,7 @@ fn bench_primary_key_filter(c: &mut Criterion) {
|
||||
});
|
||||
|
||||
group.bench_function("sparse/fast", |b| {
|
||||
b.iter(|| black_box(sparse_fast.matches(black_box(&sparse_pk))))
|
||||
b.iter(|| black_box(sparse_fast.matches(black_box(&sparse_pk)).unwrap()))
|
||||
});
|
||||
group.bench_function("sparse/scalar", |b| {
|
||||
b.iter(|| {
|
||||
|
||||
@@ -72,6 +72,14 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to evaluate filter"))]
|
||||
EvaluateFilter {
|
||||
#[snafu(source(from(common_recordbatch::error::Error, Box::new)))]
|
||||
source: Box<common_recordbatch::error::Error>,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -86,6 +94,7 @@ impl ErrorExt for Error {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
NotSupportedField { .. } | UnsupportedOperation { .. } => StatusCode::Unsupported,
|
||||
EvaluateFilter { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,11 +20,12 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::value::Value;
|
||||
use memcomparable::Serializer;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{EvaluateFilterSnafu, Result};
|
||||
use crate::row_converter::{
|
||||
DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparsePrimaryKeyCodec,
|
||||
};
|
||||
@@ -41,8 +42,12 @@ struct PrimaryKeyFilterInner {
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilterInner {
|
||||
fn new(metadata: RegionMetadataRef, filters: Arc<Vec<SimpleFilterEvaluator>>) -> Self {
|
||||
let compiled_filters = Self::compile_filters(&metadata, &filters);
|
||||
fn new(
|
||||
metadata: RegionMetadataRef,
|
||||
filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
skip_partition_column: bool,
|
||||
) -> Self {
|
||||
let compiled_filters = Self::compile_filters(&metadata, &filters, skip_partition_column);
|
||||
Self {
|
||||
filters,
|
||||
compiled_filters,
|
||||
@@ -52,6 +57,7 @@ impl PrimaryKeyFilterInner {
|
||||
fn compile_filters(
|
||||
metadata: &RegionMetadataRef,
|
||||
filters: &[SimpleFilterEvaluator],
|
||||
skip_partition_column: bool,
|
||||
) -> Vec<CompiledPrimaryKeyFilter> {
|
||||
if filters.is_empty() || metadata.primary_key.is_empty() {
|
||||
return Vec::new();
|
||||
@@ -59,7 +65,7 @@ impl PrimaryKeyFilterInner {
|
||||
|
||||
let mut compiled_filters = Vec::with_capacity(filters.len());
|
||||
for (filter_idx, filter) in filters.iter().enumerate() {
|
||||
if is_partition_column(filter.column_name()) {
|
||||
if skip_partition_column && is_partition_column(filter.column_name()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -91,43 +97,36 @@ impl PrimaryKeyFilterInner {
|
||||
compiled_filters
|
||||
}
|
||||
|
||||
fn evaluate_filters<'a>(&self, accessor: &mut impl PrimaryKeyValueAccessor<'a>) -> bool {
|
||||
fn evaluate_filters<'a>(
|
||||
&self,
|
||||
accessor: &mut impl PrimaryKeyValueAccessor<'a>,
|
||||
) -> Result<bool> {
|
||||
if self.compiled_filters.is_empty() {
|
||||
return true;
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
for compiled in &self.compiled_filters {
|
||||
let filter = &self.filters[compiled.filter_idx];
|
||||
|
||||
let passed = if let Some(fast_path) = &compiled.fast_path {
|
||||
let encoded_value = match accessor.encoded_value(compiled) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Failed to decode primary key");
|
||||
return true;
|
||||
}
|
||||
};
|
||||
let encoded_value = accessor.encoded_value(compiled)?;
|
||||
fast_path.matches(encoded_value)
|
||||
} else {
|
||||
let value = match accessor.decode_value(compiled) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
common_telemetry::error!(e; "Failed to decode primary key");
|
||||
return true;
|
||||
}
|
||||
};
|
||||
let value = accessor.decode_value(compiled)?;
|
||||
|
||||
// Safety: arrow schema and datatypes are constructed from the same source.
|
||||
let scalar_value = value.try_to_scalar_value(&compiled.data_type).unwrap();
|
||||
filter.evaluate_scalar(&scalar_value).unwrap_or(true)
|
||||
filter
|
||||
.evaluate_scalar(&scalar_value)
|
||||
.context(EvaluateFilterSnafu)?
|
||||
};
|
||||
|
||||
if !passed {
|
||||
return false;
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -258,9 +257,10 @@ impl DensePrimaryKeyFilter {
|
||||
metadata: RegionMetadataRef,
|
||||
filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
codec: DensePrimaryKeyCodec,
|
||||
skip_partition_column: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: PrimaryKeyFilterInner::new(metadata, filters),
|
||||
inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column),
|
||||
codec,
|
||||
offsets_buf: Vec::new(),
|
||||
}
|
||||
@@ -268,7 +268,7 @@ impl DensePrimaryKeyFilter {
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilter for DensePrimaryKeyFilter {
|
||||
fn matches(&mut self, pk: &[u8]) -> bool {
|
||||
fn matches(&mut self, pk: &[u8]) -> Result<bool> {
|
||||
self.offsets_buf.clear();
|
||||
let mut accessor = DensePrimaryKeyValueAccessor {
|
||||
pk,
|
||||
@@ -311,9 +311,10 @@ impl SparsePrimaryKeyFilter {
|
||||
metadata: RegionMetadataRef,
|
||||
filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
codec: SparsePrimaryKeyCodec,
|
||||
skip_partition_column: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: PrimaryKeyFilterInner::new(metadata, filters),
|
||||
inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column),
|
||||
codec,
|
||||
offsets_map: HashMap::new(),
|
||||
}
|
||||
@@ -321,7 +322,7 @@ impl SparsePrimaryKeyFilter {
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilter for SparsePrimaryKeyFilter {
|
||||
fn matches(&mut self, pk: &[u8]) -> bool {
|
||||
fn matches(&mut self, pk: &[u8]) -> Result<bool> {
|
||||
self.offsets_map.clear();
|
||||
let mut accessor = SparsePrimaryKeyValueAccessor {
|
||||
pk,
|
||||
@@ -369,6 +370,7 @@ mod tests {
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::{OrderedFloat, ValueRef};
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use super::*;
|
||||
@@ -423,6 +425,36 @@ mod tests {
|
||||
Arc::new(metadata)
|
||||
}
|
||||
|
||||
fn setup_partitioned_metadata() -> RegionMetadataRef {
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 10,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("tag", ConcreteDataType::string_datatype(), true),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
greptime_timestamp(),
|
||||
ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 2,
|
||||
})
|
||||
.primary_key(vec![10, 1]);
|
||||
Arc::new(builder.build().unwrap())
|
||||
}
|
||||
|
||||
fn create_test_row() -> Vec<(ColumnId, ValueRef<'static>)> {
|
||||
vec![
|
||||
(1, ValueRef::String("greptime-frontend-6989d9899-22222")),
|
||||
@@ -479,8 +511,8 @@ mod tests {
|
||||
)]);
|
||||
let pk = encode_sparse_pk(&metadata, create_test_row());
|
||||
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
|
||||
assert!(filter.matches(&pk));
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -492,8 +524,8 @@ mod tests {
|
||||
)]);
|
||||
let pk = encode_sparse_pk(&metadata, create_test_row());
|
||||
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
|
||||
assert!(!filter.matches(&pk));
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(!filter.matches(&pk).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -505,8 +537,8 @@ mod tests {
|
||||
)]);
|
||||
let pk = encode_sparse_pk(&metadata, create_test_row());
|
||||
let codec = SparsePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
|
||||
assert!(filter.matches(&pk));
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -518,8 +550,8 @@ mod tests {
|
||||
)]);
|
||||
let pk = encode_dense_pk(&metadata, create_test_row());
|
||||
let codec = DensePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
|
||||
assert!(filter.matches(&pk));
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -531,8 +563,8 @@ mod tests {
|
||||
)]);
|
||||
let pk = encode_dense_pk(&metadata, create_test_row());
|
||||
let codec = DensePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
|
||||
assert!(!filter.matches(&pk));
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(!filter.matches(&pk).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -544,8 +576,8 @@ mod tests {
|
||||
)]);
|
||||
let pk = encode_dense_pk(&metadata, create_test_row());
|
||||
let codec = DensePrimaryKeyCodec::new(&metadata);
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
|
||||
assert!(filter.matches(&pk));
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -563,8 +595,9 @@ mod tests {
|
||||
|
||||
for (op, value, expected) in cases {
|
||||
let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
|
||||
assert_eq!(expected, filter.matches(&pk));
|
||||
let mut filter =
|
||||
DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone(), false);
|
||||
assert_eq!(expected, filter.matches(&pk).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -583,8 +616,9 @@ mod tests {
|
||||
|
||||
for (op, value, expected) in cases {
|
||||
let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
|
||||
let mut filter = SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
|
||||
assert_eq!(expected, filter.matches(&pk));
|
||||
let mut filter =
|
||||
SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone(), false);
|
||||
assert_eq!(expected, filter.matches(&pk).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -616,8 +650,52 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let filters = Arc::new(vec![create_filter_with_op("f", Operator::Eq, 0.0_f64)]);
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
|
||||
assert!(filter.matches(&pk));
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dense_primary_key_filter_matches_partition_column_by_default() {
|
||||
let metadata = setup_partitioned_metadata();
|
||||
let codec = DensePrimaryKeyCodec::new(&metadata);
|
||||
let mut pk = Vec::new();
|
||||
codec
|
||||
.encode_to_vec(
|
||||
[ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(),
|
||||
&mut pk,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let filters = Arc::new(vec![create_filter_with_op(
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
|
||||
Operator::Eq,
|
||||
42_u32,
|
||||
)]);
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
|
||||
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dense_primary_key_filter_can_skip_partition_column() {
|
||||
let metadata = setup_partitioned_metadata();
|
||||
let codec = DensePrimaryKeyCodec::new(&metadata);
|
||||
let mut pk = Vec::new();
|
||||
codec
|
||||
.encode_to_vec(
|
||||
[ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(),
|
||||
&mut pk,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let filters = Arc::new(vec![create_filter_with_op(
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
|
||||
Operator::Eq,
|
||||
7_u32,
|
||||
)]);
|
||||
let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, true);
|
||||
|
||||
assert!(filter.matches(&pk).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ pub trait PrimaryKeyCodecExt {
|
||||
|
||||
pub trait PrimaryKeyFilter: Send + Sync {
|
||||
/// Returns true if the primary key matches the filter.
|
||||
fn matches(&mut self, pk: &[u8]) -> bool;
|
||||
fn matches(&mut self, pk: &[u8]) -> Result<bool>;
|
||||
}
|
||||
|
||||
/// Composite values decoded from primary key bytes.
|
||||
@@ -120,6 +120,7 @@ pub trait PrimaryKeyCodec: Send + Sync + Debug {
|
||||
&self,
|
||||
metadata: &RegionMetadataRef,
|
||||
filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
skip_partition_column: bool,
|
||||
) -> Box<dyn PrimaryKeyFilter>;
|
||||
|
||||
/// Returns the estimated size of the primary key.
|
||||
|
||||
@@ -556,11 +556,13 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec {
|
||||
&self,
|
||||
metadata: &RegionMetadataRef,
|
||||
filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
skip_partition_column: bool,
|
||||
) -> Box<dyn PrimaryKeyFilter> {
|
||||
Box::new(DensePrimaryKeyFilter::new(
|
||||
metadata.clone(),
|
||||
filters,
|
||||
self.clone(),
|
||||
skip_partition_column,
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -357,11 +357,13 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
|
||||
&self,
|
||||
metadata: &RegionMetadataRef,
|
||||
filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
skip_partition_column: bool,
|
||||
) -> Box<dyn PrimaryKeyFilter> {
|
||||
Box::new(SparsePrimaryKeyFilter::new(
|
||||
metadata.clone(),
|
||||
filters,
|
||||
self.clone(),
|
||||
skip_partition_column,
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -13,12 +13,15 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::Rows;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datafusion_expr::{col, lit};
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector};
|
||||
|
||||
use crate::config::MitoConfig;
|
||||
use crate::engine::MitoEngine;
|
||||
use crate::test_util::batch_util::sort_batches_and_print;
|
||||
use crate::test_util::{
|
||||
CreateRequestBuilder, TestEnv, build_rows_for_key, flush_region, put_rows, rows_schema,
|
||||
@@ -107,6 +110,27 @@ async fn test_last_row(append_mode: bool, flat_format: bool) {
|
||||
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
|
||||
}
|
||||
|
||||
async fn scan_last_row(
|
||||
engine: &MitoEngine,
|
||||
region_id: RegionId,
|
||||
filters: Vec<datafusion_expr::Expr>,
|
||||
) -> String {
|
||||
let scanner = engine
|
||||
.scanner(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
filters,
|
||||
series_row_selector: Some(TimeSeriesRowSelector::LastRow),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
sort_batches_and_print(&batches, &["tag_0", "ts"])
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_row_append_mode_disabled() {
|
||||
test_last_row(false, false).await;
|
||||
@@ -126,3 +150,69 @@ async fn test_last_row_flat_format_append_mode_disabled() {
|
||||
async fn test_last_row_flat_format_append_mode_enabled() {
|
||||
test_last_row(true, true).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_last_row_flat_format_prefilter_does_not_poison_selector_cache() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
selector_result_cache_size: ReadableSize::mb(1),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("sst_format", "flat")
|
||||
.build();
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let rows = Rows {
|
||||
schema: column_schemas,
|
||||
rows: [
|
||||
build_rows_for_key("a", 0, 3, 0),
|
||||
build_rows_for_key("b", 0, 3, 10),
|
||||
]
|
||||
.concat(),
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
flush_region(&engine, region_id, Some(16)).await;
|
||||
|
||||
let filtered = scan_last_row(&engine, region_id, vec![col("tag_0").eq(lit("a"))]).await;
|
||||
assert_eq!(
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| a | 2.0 | 1970-01-01T00:00:02 |
|
||||
+-------+---------+---------------------+",
|
||||
filtered
|
||||
);
|
||||
|
||||
let unfiltered = scan_last_row(&engine, region_id, vec![]).await;
|
||||
assert_eq!(
|
||||
"\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| a | 2.0 | 1970-01-01T00:00:02 |
|
||||
| b | 12.0 | 1970-01-01T00:00:02 |
|
||||
+-------+---------+---------------------+",
|
||||
unfiltered
|
||||
);
|
||||
}
|
||||
|
||||
@@ -367,6 +367,7 @@ fn apply_combined_filters(
|
||||
let predicate_mask = context.base.compute_filter_mask_flat(
|
||||
&record_batch,
|
||||
skip_fields,
|
||||
false,
|
||||
&mut tag_decode_state,
|
||||
)?;
|
||||
// If predicate filters out the entire batch, return None early
|
||||
|
||||
@@ -152,7 +152,8 @@ impl Partition {
|
||||
filters: &Arc<Vec<SimpleFilterEvaluator>>,
|
||||
) -> Option<Box<dyn PrimaryKeyFilter>> {
|
||||
if need_prune_key {
|
||||
let filter = row_codec.primary_key_filter(metadata, filters.clone());
|
||||
// TODO(yingwen): Remove `skip_partition_column` after dropping PartitionTreeMemtable.
|
||||
let filter = row_codec.primary_key_filter(metadata, filters.clone(), true);
|
||||
Some(filter)
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -19,9 +19,10 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{DecodeSnafu, Result};
|
||||
use crate::memtable::partition_tree::data::{
|
||||
DATA_INIT_CAP, DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder,
|
||||
};
|
||||
@@ -243,7 +244,7 @@ impl ShardReader {
|
||||
// Safety: `key_filter` is some so the shard has primary keys.
|
||||
let key = self.key_dict.as_ref().unwrap().key_by_pk_index(pk_index);
|
||||
let now = Instant::now();
|
||||
if key_filter.matches(key) {
|
||||
if key_filter.matches(key).context(DecodeSnafu)? {
|
||||
self.prune_pk_cost += now.elapsed();
|
||||
self.last_yield_pk_index = Some(pk_index);
|
||||
self.keys_after_pruning += 1;
|
||||
|
||||
@@ -20,9 +20,10 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use snafu::ResultExt;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::error::{DecodeSnafu, Result};
|
||||
use crate::memtable::partition_tree::data::{
|
||||
DATA_INIT_CAP, DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts,
|
||||
};
|
||||
@@ -281,7 +282,7 @@ impl ShardBuilderReader {
|
||||
self.keys_before_pruning += 1;
|
||||
let key = self.dict_reader.key_by_pk_index(pk_index);
|
||||
let now = Instant::now();
|
||||
if key_filter.matches(key) {
|
||||
if key_filter.matches(key).context(DecodeSnafu)? {
|
||||
self.prune_pk_cost += now.elapsed();
|
||||
self.last_yield_pk_index = Some(pk_index);
|
||||
self.keys_after_pruning += 1;
|
||||
|
||||
@@ -142,6 +142,7 @@ mod tests {
|
||||
use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
|
||||
use crate::sst::parquet::flat_format::FlatWriteFormat;
|
||||
use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
|
||||
use crate::sst::parquet::row_selection::RowGroupSelection;
|
||||
use crate::sst::parquet::writer::ParquetWriter;
|
||||
use crate::sst::{
|
||||
DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
|
||||
@@ -1113,6 +1114,39 @@ mod tests {
|
||||
assert!(reader.next_record_batch().await.unwrap().is_none());
|
||||
}
|
||||
|
||||
fn new_record_batch_from_rows(rows: &[(&str, &str, i64)]) -> RecordBatch {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
|
||||
|
||||
let mut tag_0_builder = StringDictionaryBuilder::<UInt32Type>::new();
|
||||
let mut tag_1_builder = StringDictionaryBuilder::<UInt32Type>::new();
|
||||
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
|
||||
let mut field_values = Vec::with_capacity(rows.len());
|
||||
let mut timestamps = Vec::with_capacity(rows.len());
|
||||
|
||||
for (tag_0, tag_1, ts) in rows {
|
||||
tag_0_builder.append_value(*tag_0);
|
||||
tag_1_builder.append_value(*tag_1);
|
||||
pk_builder.append(new_primary_key(&[tag_0, tag_1])).unwrap();
|
||||
field_values.push(*ts as u64);
|
||||
timestamps.push(*ts);
|
||||
}
|
||||
|
||||
RecordBatch::try_new(
|
||||
flat_schema,
|
||||
vec![
|
||||
Arc::new(tag_0_builder.finish()) as ArrayRef,
|
||||
Arc::new(tag_1_builder.finish()) as ArrayRef,
|
||||
Arc::new(UInt64Array::from(field_values)) as ArrayRef,
|
||||
Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef,
|
||||
Arc::new(pk_builder.finish()) as ArrayRef,
|
||||
Arc::new(UInt64Array::from_value(1000, rows.len())) as ArrayRef,
|
||||
Arc::new(UInt8Array::from_value(OpType::Put as u8, rows.len())) as ArrayRef,
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Creates a flat format RecordBatch for testing with sparse primary key encoding.
|
||||
/// Similar to `new_record_batch_by_range` but without individual primary key columns.
|
||||
fn new_record_batch_by_range_sparse(
|
||||
@@ -1642,6 +1676,133 @@ mod tests {
|
||||
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reader_prefilter_with_outer_selection_and_trailing_filtered_rows() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let object_store = env.init_object_store_manager();
|
||||
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let row_group_size = 10;
|
||||
|
||||
let flat_source = new_flat_source_from_record_batches(vec![
|
||||
new_record_batch_by_range(&["a", "d"], 0, 3),
|
||||
new_record_batch_by_range(&["b", "d"], 3, 10),
|
||||
]);
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size,
|
||||
..Default::default()
|
||||
};
|
||||
let indexer_builder = create_test_indexer_builder(
|
||||
&env,
|
||||
object_store.clone(),
|
||||
file_path.clone(),
|
||||
metadata.clone(),
|
||||
row_group_size,
|
||||
);
|
||||
let info = write_flat_sst(
|
||||
object_store.clone(),
|
||||
metadata.clone(),
|
||||
indexer_builder,
|
||||
file_path,
|
||||
flat_source,
|
||||
&write_opts,
|
||||
)
|
||||
.await;
|
||||
let handle = create_file_handle_from_sst_info(&info, &metadata);
|
||||
|
||||
let builder =
|
||||
ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
|
||||
.flat_format(true)
|
||||
.predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
|
||||
|
||||
let mut metrics = ReaderMetrics::default();
|
||||
let (context, _) = builder
|
||||
.build_reader_input(&mut metrics)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let selection = RowGroupSelection::from_row_ranges(
|
||||
vec![(0, std::iter::once(0..6).collect())],
|
||||
row_group_size,
|
||||
);
|
||||
|
||||
let mut reader = ParquetReader::new(Arc::new(context), selection)
|
||||
.await
|
||||
.unwrap();
|
||||
check_record_batch_reader_result(
|
||||
&mut reader,
|
||||
&[new_record_batch_by_range(&["a", "d"], 0, 3)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reader_prefilter_with_outer_selection_disjoint_matches_and_trailing_gap() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let object_store = env.init_object_store_manager();
|
||||
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let row_group_size = 8;
|
||||
|
||||
let flat_source = new_flat_source_from_record_batches(vec![
|
||||
new_record_batch_by_range(&["a", "d"], 0, 2),
|
||||
new_record_batch_by_range(&["b", "d"], 2, 4),
|
||||
new_record_batch_by_range(&["a", "d"], 4, 6),
|
||||
new_record_batch_by_range(&["c", "d"], 6, 8),
|
||||
]);
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size,
|
||||
..Default::default()
|
||||
};
|
||||
let indexer_builder = create_test_indexer_builder(
|
||||
&env,
|
||||
object_store.clone(),
|
||||
file_path.clone(),
|
||||
metadata.clone(),
|
||||
row_group_size,
|
||||
);
|
||||
let info = write_flat_sst(
|
||||
object_store.clone(),
|
||||
metadata.clone(),
|
||||
indexer_builder,
|
||||
file_path,
|
||||
flat_source,
|
||||
&write_opts,
|
||||
)
|
||||
.await;
|
||||
let handle = create_file_handle_from_sst_info(&info, &metadata);
|
||||
|
||||
let builder =
|
||||
ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store)
|
||||
.flat_format(true)
|
||||
.predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))])));
|
||||
|
||||
let mut metrics = ReaderMetrics::default();
|
||||
let (context, _) = builder
|
||||
.build_reader_input(&mut metrics)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let selection = RowGroupSelection::from_row_ranges(
|
||||
vec![(0, std::iter::once(0..8).collect())],
|
||||
row_group_size,
|
||||
);
|
||||
|
||||
let mut reader = ParquetReader::new(Arc::new(context), selection)
|
||||
.await
|
||||
.unwrap();
|
||||
check_record_batch_reader_result(
|
||||
&mut reader,
|
||||
&[new_record_batch_from_rows(&[
|
||||
("a", "d", 0),
|
||||
("a", "d", 1),
|
||||
("a", "d", 4),
|
||||
("a", "d", 5),
|
||||
])],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_flat_read_with_inverted_index_sparse() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
@@ -37,6 +37,7 @@ use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, TimeSeriesRowSelector};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::{
|
||||
ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
|
||||
EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
|
||||
@@ -53,7 +54,8 @@ use crate::sst::parquet::flat_format::{
|
||||
};
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::sst::parquet::reader::{
|
||||
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
|
||||
FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReader, RowGroupReaderBuilder,
|
||||
SimpleFilterContext,
|
||||
};
|
||||
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
||||
use crate::sst::parquet::stats::RowGroupPruningStats;
|
||||
@@ -181,14 +183,17 @@ impl FileRange {
|
||||
if !self.in_dynamic_filter_range() {
|
||||
return Ok(None);
|
||||
}
|
||||
// Compute skip_fields once for this row group
|
||||
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder
|
||||
.build(
|
||||
.build(self.context.build_context(
|
||||
self.row_group_idx,
|
||||
self.row_selection.clone(),
|
||||
fetch_metrics,
|
||||
)
|
||||
skip_fields,
|
||||
))
|
||||
.await?;
|
||||
|
||||
let use_last_row_reader = if selector
|
||||
@@ -210,9 +215,6 @@ impl FileRange {
|
||||
false
|
||||
};
|
||||
|
||||
// Compute skip_fields once for this row group
|
||||
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
|
||||
|
||||
let prune_reader = if use_last_row_reader {
|
||||
// Row group is PUT only, use LastRowReader to skip unnecessary rows.
|
||||
let reader = RowGroupLastRowCachedReader::new(
|
||||
@@ -243,14 +245,17 @@ impl FileRange {
|
||||
if !self.in_dynamic_filter_range() {
|
||||
return Ok(None);
|
||||
}
|
||||
// Compute skip_fields once for this row group
|
||||
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder
|
||||
.build(
|
||||
.build(self.context.build_context(
|
||||
self.row_group_idx,
|
||||
self.row_selection.clone(),
|
||||
fetch_metrics,
|
||||
)
|
||||
skip_fields,
|
||||
))
|
||||
.await?;
|
||||
|
||||
let use_last_row_reader = if selector
|
||||
@@ -271,16 +276,20 @@ impl FileRange {
|
||||
false
|
||||
};
|
||||
|
||||
// Compute skip_fields once for this row group
|
||||
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
|
||||
|
||||
let flat_prune_reader = if use_last_row_reader {
|
||||
let flat_row_group_reader =
|
||||
FlatRowGroupReader::new(self.context.clone(), parquet_reader);
|
||||
// Flat PK prefilter makes the input stream predicate-dependent, so cached
|
||||
// selector results are not reusable across queries with different filters.
|
||||
let cache_strategy = if self.context.reader_builder.has_flat_primary_key_prefilter() {
|
||||
CacheStrategy::Disabled
|
||||
} else {
|
||||
self.context.reader_builder.cache_strategy().clone()
|
||||
};
|
||||
let reader = FlatRowGroupLastRowCachedReader::new(
|
||||
self.file_handle().file_id().file_id(),
|
||||
self.row_group_idx,
|
||||
self.context.reader_builder.cache_strategy().clone(),
|
||||
cache_strategy,
|
||||
self.context.read_format().projection_indices(),
|
||||
flat_row_group_reader,
|
||||
);
|
||||
@@ -387,7 +396,11 @@ impl FileRangeContext {
|
||||
input: RecordBatch,
|
||||
skip_fields: bool,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
self.base.precise_filter_flat(input, skip_fields)
|
||||
self.base.precise_filter_flat(
|
||||
input,
|
||||
skip_fields,
|
||||
self.reader_builder.has_flat_primary_key_prefilter(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Determines whether to skip field filters based on PreFilterMode and row group delete status.
|
||||
@@ -408,6 +421,23 @@ impl FileRangeContext {
|
||||
row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
|
||||
}
|
||||
|
||||
/// Creates a [RowGroupBuildContext] for building row group readers with prefiltering.
|
||||
pub(crate) fn build_context<'a>(
|
||||
&'a self,
|
||||
row_group_idx: usize,
|
||||
row_selection: Option<RowSelection>,
|
||||
fetch_metrics: Option<&'a ParquetFetchMetrics>,
|
||||
skip_fields: bool,
|
||||
) -> RowGroupBuildContext<'a> {
|
||||
RowGroupBuildContext {
|
||||
filters: &self.base.filters,
|
||||
skip_fields,
|
||||
row_group_idx,
|
||||
row_selection,
|
||||
fetch_metrics,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the estimated memory size of this context.
|
||||
/// Mainly accounts for the parquet metadata size.
|
||||
pub(crate) fn memory_size(&self) -> usize {
|
||||
@@ -600,9 +630,15 @@ impl RangeBase {
|
||||
&self,
|
||||
input: RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_prefiltered_pk_filters: bool,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
let mut tag_decode_state = TagDecodeState::new();
|
||||
let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
|
||||
let mask = self.compute_filter_mask_flat(
|
||||
&input,
|
||||
skip_fields,
|
||||
skip_prefiltered_pk_filters,
|
||||
&mut tag_decode_state,
|
||||
)?;
|
||||
|
||||
// If mask is None, the entire batch is filtered out
|
||||
let Some(mut mask) = mask else {
|
||||
@@ -647,6 +683,7 @@ impl RangeBase {
|
||||
&self,
|
||||
input: &RecordBatch,
|
||||
skip_fields: bool,
|
||||
skip_prefiltered_pk_filters: bool,
|
||||
tag_decode_state: &mut TagDecodeState,
|
||||
) -> Result<Option<BooleanBuffer>> {
|
||||
let mut mask = BooleanBuffer::new_set(input.num_rows());
|
||||
@@ -674,6 +711,12 @@ impl RangeBase {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Flat parquet PK prefiltering already applied these tag predicates while refining
|
||||
// row selection, so skip them here to avoid decoding/evaluating the same condition twice.
|
||||
if skip_prefiltered_pk_filters && filter_ctx.usable_primary_key_filter() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Get the column directly by its projected index.
|
||||
// If the column is missing and it's not a tag/time column, this filter is skipped.
|
||||
// Assumes the projection indices align with the input batch schema.
|
||||
@@ -926,3 +969,62 @@ impl RangeBase {
|
||||
RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion_expr::{col, lit};
|
||||
|
||||
use super::*;
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata};
|
||||
|
||||
fn new_test_range_base(filters: Vec<SimpleFilterContext>) -> RangeBase {
|
||||
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
|
||||
let read_format = ReadFormat::new_flat(
|
||||
metadata.clone(),
|
||||
metadata.column_metadatas.iter().map(|c| c.column_id),
|
||||
None,
|
||||
"test",
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
RangeBase {
|
||||
filters,
|
||||
dyn_filters: vec![],
|
||||
read_format,
|
||||
expected_metadata: None,
|
||||
prune_schema: metadata.schema.clone(),
|
||||
codec: mito_codec::row_converter::build_primary_key_codec(metadata.as_ref()),
|
||||
compat_batch: None,
|
||||
compaction_projection_mapper: None,
|
||||
pre_filter_mode: PreFilterMode::All,
|
||||
partition_filter: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_filter_mask_flat_skips_prefiltered_pk_filters() {
|
||||
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
|
||||
let filters = vec![
|
||||
SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(),
|
||||
SimpleFilterContext::new_opt(&metadata, None, &col("field_0").gt(lit(1_u64))).unwrap(),
|
||||
];
|
||||
let base = new_test_range_base(filters);
|
||||
let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
|
||||
|
||||
let mask_without_skip = base
|
||||
.compute_filter_mask_flat(&batch, false, false, &mut TagDecodeState::new())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(mask_without_skip.count_set_bits(), 0);
|
||||
|
||||
let mask_with_skip = base
|
||||
.compute_filter_mask_flat(&batch, false, true, &mut TagDecodeState::new())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(mask_with_skip.count_set_bits(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,6 +282,13 @@ impl FlatReadFormat {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if raw batches from parquet use the flat layout with a
|
||||
/// dictionary-encoded `__primary_key` column (i.e., [`ParquetAdapter::Flat`]).
|
||||
/// Returns `false` for the legacy primary-key-to-flat conversion path.
|
||||
pub(crate) fn raw_batch_has_primary_key_dictionary(&self) -> bool {
|
||||
matches!(&self.parquet_adapter, ParquetAdapter::Flat(_))
|
||||
}
|
||||
|
||||
/// Creates a sequence array to override.
|
||||
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
|
||||
self.override_sequence
|
||||
|
||||
@@ -12,31 +12,40 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Helpers for parquet prefiltering.
|
||||
//! Prefilter framework for parquet reader.
|
||||
//!
|
||||
//! Prefilter optimization reduces I/O by reading only a subset of columns first
|
||||
//! (the prefilter phase), applying filters to compute a refined row selection,
|
||||
//! then reading the remaining columns with the refined selection.
|
||||
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datatypes::arrow::array::{BinaryArray, BooleanArray};
|
||||
use datatypes::arrow::array::BinaryArray;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito_codec::primary_key_filter::is_partition_column;
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use futures::StreamExt;
|
||||
use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
|
||||
use parquet::arrow::ProjectionMask;
|
||||
use parquet::arrow::arrow_reader::RowSelection;
|
||||
use parquet::schema::types::SchemaDescriptor;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
|
||||
|
||||
use crate::error::{ComputeArrowSnafu, Result, UnexpectedSnafu};
|
||||
use crate::error::{DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
|
||||
use crate::sst::parquet::flat_format::primary_key_column_index;
|
||||
use crate::sst::parquet::format::PrimaryKeyArray;
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
|
||||
use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder};
|
||||
use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact;
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn matching_row_ranges_by_primary_key(
|
||||
input: &RecordBatch,
|
||||
pk_column_index: usize,
|
||||
pk_filter: &mut dyn PrimaryKeyFilter,
|
||||
) -> Result<Vec<Range<usize>>> {
|
||||
let primary_key_index = primary_key_column_index(input.num_columns());
|
||||
let pk_dict_array = input
|
||||
.column(primary_key_index)
|
||||
.column(pk_column_index)
|
||||
.as_any()
|
||||
.downcast_ref::<PrimaryKeyArray>()
|
||||
.context(UnexpectedSnafu {
|
||||
@@ -65,7 +74,10 @@ pub(crate) fn matching_row_ranges_by_primary_key(
|
||||
end += 1;
|
||||
}
|
||||
|
||||
if pk_filter.matches(pk_values.value(key as usize)) {
|
||||
if pk_filter
|
||||
.matches(pk_values.value(key as usize))
|
||||
.context(DecodeSnafu)?
|
||||
{
|
||||
if let Some(last) = matched_row_ranges.last_mut()
|
||||
&& last.end == start
|
||||
{
|
||||
@@ -81,68 +93,15 @@ pub(crate) fn matching_row_ranges_by_primary_key(
|
||||
Ok(matched_row_ranges)
|
||||
}
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn prefilter_flat_batch_by_primary_key(
|
||||
input: RecordBatch,
|
||||
pk_filter: &mut dyn PrimaryKeyFilter,
|
||||
) -> Result<Option<RecordBatch>> {
|
||||
if input.num_rows() == 0 {
|
||||
return Ok(Some(input));
|
||||
}
|
||||
|
||||
let matched_row_ranges = matching_row_ranges_by_primary_key(&input, pk_filter)?;
|
||||
if matched_row_ranges.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if matched_row_ranges.len() == 1
|
||||
&& matched_row_ranges[0].start == 0
|
||||
&& matched_row_ranges[0].end == input.num_rows()
|
||||
{
|
||||
return Ok(Some(input));
|
||||
}
|
||||
|
||||
if matched_row_ranges.len() == 1 {
|
||||
let span = &matched_row_ranges[0];
|
||||
return Ok(Some(input.slice(span.start, span.end - span.start)));
|
||||
}
|
||||
|
||||
let mut mask = vec![false; input.num_rows()];
|
||||
for span in matched_row_ranges {
|
||||
mask[span].fill(true);
|
||||
}
|
||||
|
||||
let filtered =
|
||||
datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
|
||||
.context(ComputeArrowSnafu)?;
|
||||
if filtered.num_rows() == 0 {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(filtered))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn retain_usable_primary_key_filters(
|
||||
sst_metadata: &RegionMetadataRef,
|
||||
expected_metadata: Option<&RegionMetadata>,
|
||||
filters: &mut Vec<SimpleFilterEvaluator>,
|
||||
) {
|
||||
filters.retain(|filter| is_usable_primary_key_filter(sst_metadata, expected_metadata, filter));
|
||||
}
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
/// Returns whether a filter can be applied by parquet primary-key prefiltering.
|
||||
///
|
||||
/// Unlike `PartitionTreeMemtable`, parquet prefilter always supports predicates
|
||||
/// on the partition column.
|
||||
pub(crate) fn is_usable_primary_key_filter(
|
||||
sst_metadata: &RegionMetadataRef,
|
||||
expected_metadata: Option<&RegionMetadata>,
|
||||
filter: &SimpleFilterEvaluator,
|
||||
) -> bool {
|
||||
// TODO(yingwen): The primary key filter always skips the partition column. Consider using a flag
|
||||
// to control this behavior. We can remove this behavior after we remove the PartitionTreeMemtable.
|
||||
if is_partition_column(filter.column_name()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
let sst_column = match expected_metadata {
|
||||
Some(expected_metadata) => {
|
||||
let Some(expected_column) = expected_metadata.column_by_name(filter.column_name())
|
||||
@@ -176,7 +135,6 @@ pub(crate) fn is_usable_primary_key_filter(
|
||||
.is_some()
|
||||
}
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) struct CachedPrimaryKeyFilter {
|
||||
inner: Box<dyn PrimaryKeyFilter>,
|
||||
last_primary_key: Vec<u8>,
|
||||
@@ -184,7 +142,6 @@ pub(crate) struct CachedPrimaryKeyFilter {
|
||||
}
|
||||
|
||||
impl CachedPrimaryKeyFilter {
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
@@ -195,49 +152,191 @@ impl CachedPrimaryKeyFilter {
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
|
||||
fn matches(&mut self, pk: &[u8]) -> bool {
|
||||
fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
|
||||
if let Some(last_match) = self.last_match
|
||||
&& self.last_primary_key == pk
|
||||
{
|
||||
return last_match;
|
||||
return Ok(last_match);
|
||||
}
|
||||
|
||||
let matched = self.inner.matches(pk);
|
||||
let matched = self.inner.matches(pk)?;
|
||||
self.last_primary_key.clear();
|
||||
self.last_primary_key.extend_from_slice(pk);
|
||||
self.last_match = Some(matched);
|
||||
matched
|
||||
Ok(matched)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(not(test), allow(dead_code))]
|
||||
pub(crate) fn batch_single_primary_key(batch: &RecordBatch) -> Result<Option<&[u8]>> {
|
||||
let primary_key_index = primary_key_column_index(batch.num_columns());
|
||||
let pk_dict_array = batch
|
||||
.column(primary_key_index)
|
||||
.as_any()
|
||||
.downcast_ref::<PrimaryKeyArray>()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Primary key column is not a dictionary array",
|
||||
})?;
|
||||
let pk_values = pk_dict_array
|
||||
.values()
|
||||
.as_any()
|
||||
.downcast_ref::<BinaryArray>()
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Primary key values are not binary array",
|
||||
})?;
|
||||
let keys = pk_dict_array.keys();
|
||||
if keys.is_empty() {
|
||||
return Ok(None);
|
||||
/// Context for prefiltering a row group.
|
||||
///
|
||||
/// Currently supports primary key (PK) filtering only.
|
||||
/// Will be extended with simple column filters and physical filters in the future.
|
||||
pub(crate) struct PrefilterContext {
|
||||
/// PK filter instance.
|
||||
pk_filter: Box<dyn PrimaryKeyFilter>,
|
||||
/// Projection mask for reading only the PK column.
|
||||
pk_projection: ProjectionMask,
|
||||
/// Index of the PK column within the prefilter projection batch.
|
||||
/// This is 0 when we project only the PK column.
|
||||
pk_column_index: usize,
|
||||
}
|
||||
|
||||
/// Pre-built state for constructing [PrefilterContext] per row group.
|
||||
///
|
||||
/// Fields invariant across row groups (projection mask, codec, metadata, filters)
|
||||
/// are computed once. A fresh [PrefilterContext] with its own mutable PK filter
|
||||
/// is created via [PrefilterContextBuilder::build()] for each row group.
|
||||
pub(crate) struct PrefilterContextBuilder {
|
||||
pk_projection: ProjectionMask,
|
||||
pk_column_index: usize,
|
||||
codec: Arc<dyn PrimaryKeyCodec>,
|
||||
metadata: RegionMetadataRef,
|
||||
pk_filters: Arc<Vec<SimpleFilterEvaluator>>,
|
||||
}
|
||||
|
||||
impl PrefilterContextBuilder {
|
||||
/// Creates a builder if prefiltering is applicable.
|
||||
///
|
||||
/// Returns `None` if:
|
||||
/// - No primary key filters are available
|
||||
/// - The read format doesn't use flat layout with dictionary-encoded PKs
|
||||
/// - The primary key is empty
|
||||
pub(crate) fn new(
|
||||
read_format: &ReadFormat,
|
||||
codec: &Arc<dyn PrimaryKeyCodec>,
|
||||
primary_key_filters: Option<&Arc<Vec<SimpleFilterEvaluator>>>,
|
||||
parquet_schema: &SchemaDescriptor,
|
||||
) -> Option<Self> {
|
||||
let pk_filters = primary_key_filters?;
|
||||
if pk_filters.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let metadata = read_format.metadata();
|
||||
if metadata.primary_key.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Only flat format with dictionary-encoded PKs supports PK prefiltering.
|
||||
let flat_format = read_format.as_flat()?;
|
||||
if !flat_format.raw_batch_has_primary_key_dictionary() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Compute PK-only projection mask.
|
||||
let num_parquet_columns = parquet_schema.num_columns();
|
||||
let pk_index = primary_key_column_index(num_parquet_columns);
|
||||
let pk_projection = ProjectionMask::roots(parquet_schema, [pk_index]);
|
||||
|
||||
// The PK column is the only column in the projection, so its index is 0.
|
||||
let pk_column_index = 0;
|
||||
|
||||
Some(Self {
|
||||
pk_projection,
|
||||
pk_column_index,
|
||||
codec: Arc::clone(codec),
|
||||
metadata: metadata.clone(),
|
||||
pk_filters: Arc::clone(pk_filters),
|
||||
})
|
||||
}
|
||||
|
||||
let first_key = keys.value(0);
|
||||
if first_key != keys.value(keys.len() - 1) {
|
||||
return Ok(None);
|
||||
/// Builds a [PrefilterContext] for a specific row group.
|
||||
pub(crate) fn build(&self) -> PrefilterContext {
|
||||
// Parquet PK prefilter always supports the partition column. Only
|
||||
// PartitionTreeMemtable skips it after partition pruning.
|
||||
let pk_filter =
|
||||
self.codec
|
||||
.primary_key_filter(&self.metadata, Arc::clone(&self.pk_filters), false);
|
||||
let pk_filter = Box::new(CachedPrimaryKeyFilter::new(pk_filter));
|
||||
PrefilterContext {
|
||||
pk_filter,
|
||||
pk_projection: self.pk_projection.clone(),
|
||||
pk_column_index: self.pk_column_index,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of prefiltering a row group.
|
||||
pub(crate) struct PrefilterResult {
|
||||
/// Refined row selection after prefiltering.
|
||||
pub(crate) refined_selection: RowSelection,
|
||||
/// Number of rows filtered out by prefiltering.
|
||||
pub(crate) filtered_rows: usize,
|
||||
}
|
||||
|
||||
/// Executes prefiltering on a row group.
|
||||
///
|
||||
/// Reads only the prefilter columns (currently the PK dictionary column),
|
||||
/// applies filters, and returns a refined [RowSelection].
|
||||
pub(crate) async fn execute_prefilter(
|
||||
prefilter_ctx: &mut PrefilterContext,
|
||||
reader_builder: &RowGroupReaderBuilder,
|
||||
build_ctx: &RowGroupBuildContext<'_>,
|
||||
) -> Result<PrefilterResult> {
|
||||
// Reads PK column only.
|
||||
let mut pk_stream = reader_builder
|
||||
.build_with_projection(
|
||||
build_ctx.row_group_idx,
|
||||
build_ctx.row_selection.clone(),
|
||||
prefilter_ctx.pk_projection.clone(),
|
||||
build_ctx.fetch_metrics,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Applies PK filter to each batch and collect matching row ranges.
|
||||
let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
|
||||
let mut row_offset = 0;
|
||||
let mut rows_before_filter = 0usize;
|
||||
|
||||
while let Some(batch_result) = pk_stream.next().await {
|
||||
let batch = batch_result.context(ReadParquetSnafu {
|
||||
path: reader_builder.file_path(),
|
||||
})?;
|
||||
let batch_num_rows = batch.num_rows();
|
||||
if batch_num_rows == 0 {
|
||||
continue;
|
||||
}
|
||||
rows_before_filter += batch_num_rows;
|
||||
|
||||
let ranges = matching_row_ranges_by_primary_key(
|
||||
&batch,
|
||||
prefilter_ctx.pk_column_index,
|
||||
prefilter_ctx.pk_filter.as_mut(),
|
||||
)?;
|
||||
matched_row_ranges.extend(
|
||||
ranges
|
||||
.into_iter()
|
||||
.map(|range| (range.start + row_offset)..(range.end + row_offset)),
|
||||
);
|
||||
row_offset += batch_num_rows;
|
||||
}
|
||||
|
||||
Ok(Some(pk_values.value(first_key as usize)))
|
||||
// Converts matched ranges to RowSelection.
|
||||
let rows_selected: usize = matched_row_ranges.iter().map(|r| r.end - r.start).sum();
|
||||
let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
|
||||
|
||||
let refined_selection = if rows_selected == 0 {
|
||||
RowSelection::from(vec![])
|
||||
} else {
|
||||
// Build the prefilter selection relative to the yielded rows
|
||||
// (not total_rows), since matched_row_ranges are offsets within
|
||||
// the rows actually read from the stream.
|
||||
let prefilter_selection =
|
||||
row_selection_from_row_ranges_exact(matched_row_ranges.into_iter(), rows_before_filter);
|
||||
|
||||
// Use and_then to apply prefilter selection within the context
|
||||
// of the original selection, since prefilter offsets are relative
|
||||
// to the original selection's selected rows.
|
||||
match &build_ctx.row_selection {
|
||||
Some(original) => original.and_then(&prefilter_selection),
|
||||
None => prefilter_selection,
|
||||
}
|
||||
};
|
||||
|
||||
Ok(PrefilterResult {
|
||||
refined_selection,
|
||||
filtered_rows,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -245,175 +344,14 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use datafusion_expr::{col, lit};
|
||||
use datatypes::arrow::array::{
|
||||
ArrayRef, BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array,
|
||||
UInt64Array,
|
||||
};
|
||||
use datatypes::arrow::datatypes::{Schema, UInt32Type};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
|
||||
use mito_codec::row_converter::PrimaryKeyFilter;
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::ColumnSchema;
|
||||
|
||||
use super::*;
|
||||
use crate::sst::internal_fields;
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
use crate::test_util::sst_util::{
|
||||
new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding,
|
||||
};
|
||||
|
||||
fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
|
||||
exprs
|
||||
.iter()
|
||||
.filter_map(SimpleFilterEvaluator::try_new)
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn expected_metadata_with_reused_tag_name(
|
||||
old_metadata: &RegionMetadata,
|
||||
) -> Arc<RegionMetadata> {
|
||||
let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_0".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 10,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_1".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"field_0".to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 2,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 3,
|
||||
})
|
||||
.primary_key(vec![10, 1]);
|
||||
|
||||
Arc::new(builder.build().unwrap())
|
||||
}
|
||||
|
||||
fn new_raw_batch_with_metadata(
|
||||
metadata: Arc<RegionMetadata>,
|
||||
primary_keys: &[&[u8]],
|
||||
field_values: &[u64],
|
||||
) -> RecordBatch {
|
||||
assert_eq!(primary_keys.len(), field_values.len());
|
||||
|
||||
let arrow_schema = metadata.schema.arrow_schema();
|
||||
let field_column = arrow_schema
|
||||
.field(arrow_schema.index_of("field_0").unwrap())
|
||||
.clone();
|
||||
let time_index_column = arrow_schema
|
||||
.field(arrow_schema.index_of("ts").unwrap())
|
||||
.clone();
|
||||
let mut fields = vec![field_column, time_index_column];
|
||||
fields.extend(
|
||||
internal_fields()
|
||||
.into_iter()
|
||||
.map(|field| field.as_ref().clone()),
|
||||
);
|
||||
let schema = Arc::new(Schema::new(fields));
|
||||
|
||||
let mut dict_values = Vec::new();
|
||||
let mut keys = Vec::with_capacity(primary_keys.len());
|
||||
for pk in primary_keys {
|
||||
let key = dict_values
|
||||
.iter()
|
||||
.position(|existing: &&[u8]| existing == pk)
|
||||
.unwrap_or_else(|| {
|
||||
dict_values.push(*pk);
|
||||
dict_values.len() - 1
|
||||
});
|
||||
keys.push(key as u32);
|
||||
}
|
||||
|
||||
let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
|
||||
UInt32Array::from(keys),
|
||||
Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
|
||||
));
|
||||
|
||||
RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(UInt64Array::from(field_values.to_vec())),
|
||||
Arc::new(TimestampMillisecondArray::from_iter_values(
|
||||
0..primary_keys.len() as i64,
|
||||
)),
|
||||
pk_array,
|
||||
Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
|
||||
Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
|
||||
],
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
|
||||
new_raw_batch_with_metadata(Arc::new(sst_region_metadata()), primary_keys, field_values)
|
||||
}
|
||||
|
||||
fn field_values(batch: &RecordBatch) -> Vec<u64> {
|
||||
batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<UInt64Array>()
|
||||
.unwrap()
|
||||
.values()
|
||||
.to_vec()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retain_usable_primary_key_filters_skips_non_tag_filters() {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let mut filters =
|
||||
new_test_filters(&[col("field_0").eq(lit(1_u64)), col("ts").gt(lit(0_i64))]);
|
||||
|
||||
retain_usable_primary_key_filters(&metadata, None, &mut filters);
|
||||
|
||||
assert!(filters.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retain_usable_primary_key_filters_skips_reused_expected_tag_name() {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let expected_metadata = expected_metadata_with_reused_tag_name(&metadata);
|
||||
let mut filters = new_test_filters(&[col("tag_0").eq(lit("b"))]);
|
||||
|
||||
retain_usable_primary_key_filters(
|
||||
&metadata,
|
||||
Some(expected_metadata.as_ref()),
|
||||
&mut filters,
|
||||
);
|
||||
|
||||
assert!(filters.is_empty());
|
||||
}
|
||||
use crate::test_util::sst_util::{new_primary_key, sst_region_metadata_with_encoding};
|
||||
|
||||
#[test]
|
||||
fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() {
|
||||
@@ -435,52 +373,16 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefilter_primary_key_drops_single_dictionary_batch() {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
|
||||
let mut primary_key_filter =
|
||||
build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
|
||||
let pk_a = new_primary_key(&["a", "x"]);
|
||||
let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
|
||||
fn test_is_usable_primary_key_filter_supports_partition_column_by_default() {
|
||||
let metadata = Arc::new(sst_region_metadata_with_encoding(
|
||||
PrimaryKeyEncoding::Sparse,
|
||||
));
|
||||
let filter = SimpleFilterEvaluator::try_new(
|
||||
&col(store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME).eq(lit(1_u32)),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let filtered =
|
||||
prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut()).unwrap();
|
||||
|
||||
assert!(filtered.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let filters = Arc::new(new_test_filters(&[col("tag_0")
|
||||
.eq(lit("a"))
|
||||
.or(col("tag_0").eq(lit("c")))]));
|
||||
let mut primary_key_filter =
|
||||
build_primary_key_codec(metadata.as_ref()).primary_key_filter(&metadata, filters);
|
||||
let pk_a = new_primary_key(&["a", "x"]);
|
||||
let pk_b = new_primary_key(&["b", "x"]);
|
||||
let pk_c = new_primary_key(&["c", "x"]);
|
||||
let pk_d = new_primary_key(&["d", "x"]);
|
||||
let batch = new_raw_batch(
|
||||
&[
|
||||
pk_a.as_slice(),
|
||||
pk_a.as_slice(),
|
||||
pk_b.as_slice(),
|
||||
pk_b.as_slice(),
|
||||
pk_c.as_slice(),
|
||||
pk_c.as_slice(),
|
||||
pk_d.as_slice(),
|
||||
pk_d.as_slice(),
|
||||
],
|
||||
&[10, 11, 12, 13, 14, 15, 16, 17],
|
||||
);
|
||||
|
||||
let filtered = prefilter_flat_batch_by_primary_key(batch, primary_key_filter.as_mut())
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(filtered.num_rows(), 4);
|
||||
assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
|
||||
assert!(is_usable_primary_key_filter(&metadata, None, &filter));
|
||||
}
|
||||
|
||||
struct CountingPrimaryKeyFilter {
|
||||
@@ -489,9 +391,9 @@ mod tests {
|
||||
}
|
||||
|
||||
impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
|
||||
fn matches(&mut self, pk: &[u8]) -> bool {
|
||||
fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
|
||||
self.hits.fetch_add(1, Ordering::Relaxed);
|
||||
pk == self.expected.as_slice()
|
||||
Ok(pk == self.expected.as_slice())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -504,25 +406,14 @@ mod tests {
|
||||
expected: expected.clone(),
|
||||
}));
|
||||
|
||||
assert!(filter.matches(expected.as_slice()));
|
||||
assert!(filter.matches(expected.as_slice()));
|
||||
assert!(!filter.matches(new_primary_key(&["b", "x"]).as_slice()));
|
||||
assert!(filter.matches(expected.as_slice()).unwrap());
|
||||
assert!(filter.matches(expected.as_slice()).unwrap());
|
||||
assert!(
|
||||
!filter
|
||||
.matches(new_primary_key(&["b", "x"]).as_slice())
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
assert_eq!(hits.load(Ordering::Relaxed), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_single_primary_key() {
|
||||
let pk_a = new_primary_key(&["a", "x"]);
|
||||
let pk_b = new_primary_key(&["b", "x"]);
|
||||
|
||||
let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
|
||||
assert_eq!(
|
||||
batch_single_primary_key(&batch).unwrap(),
|
||||
Some(pk_a.as_slice())
|
||||
);
|
||||
|
||||
let batch = new_raw_batch(&[pk_a.as_slice(), pk_b.as_slice()], &[10, 11]);
|
||||
assert_eq!(batch_single_primary_key(&batch).unwrap(), None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,6 +76,9 @@ use crate::sst::parquet::file_range::{
|
||||
};
|
||||
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
use crate::sst::parquet::prefilter::{
|
||||
PrefilterContextBuilder, execute_prefilter, is_usable_primary_key_filter,
|
||||
};
|
||||
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
||||
use crate::sst::parquet::row_selection::RowGroupSelection;
|
||||
use crate::sst::parquet::stats::RowGroupPruningStats;
|
||||
@@ -459,16 +462,6 @@ impl ParquetReaderBuilder {
|
||||
ArrowReaderMetadata::try_new(parquet_meta.clone(), arrow_reader_options)
|
||||
.context(ReadDataPartSnafu)?;
|
||||
|
||||
let reader_builder = RowGroupReaderBuilder {
|
||||
file_handle: self.file_handle.clone(),
|
||||
file_path,
|
||||
parquet_meta,
|
||||
arrow_metadata,
|
||||
object_store: self.object_store.clone(),
|
||||
projection: projection_mask,
|
||||
cache_strategy: self.cache_strategy.clone(),
|
||||
};
|
||||
|
||||
let filters = if let Some(predicate) = &self.predicate {
|
||||
predicate
|
||||
.exprs()
|
||||
@@ -493,6 +486,33 @@ impl ParquetReaderBuilder {
|
||||
|
||||
let codec = build_primary_key_codec(read_format.metadata());
|
||||
|
||||
// Extract primary key filters from precomputed filter contexts for prefiltering.
|
||||
let primary_key_filters = {
|
||||
let pk_filters = filters
|
||||
.iter()
|
||||
.filter_map(SimpleFilterContext::primary_key_prefilter)
|
||||
.collect::<Vec<_>>();
|
||||
(!pk_filters.is_empty()).then_some(Arc::new(pk_filters))
|
||||
};
|
||||
|
||||
let prefilter_builder = PrefilterContextBuilder::new(
|
||||
&read_format,
|
||||
&codec,
|
||||
primary_key_filters.as_ref(),
|
||||
parquet_meta.file_metadata().schema_descr(),
|
||||
);
|
||||
|
||||
let reader_builder = RowGroupReaderBuilder {
|
||||
file_handle: self.file_handle.clone(),
|
||||
file_path,
|
||||
parquet_meta,
|
||||
arrow_metadata,
|
||||
object_store: self.object_store.clone(),
|
||||
projection: projection_mask,
|
||||
cache_strategy: self.cache_strategy.clone(),
|
||||
prefilter_builder,
|
||||
};
|
||||
|
||||
let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
|
||||
|
||||
let context = FileRangeContext::new(
|
||||
@@ -1658,6 +1678,25 @@ pub(crate) struct RowGroupReaderBuilder {
|
||||
projection: ProjectionMask,
|
||||
/// Cache.
|
||||
cache_strategy: CacheStrategy,
|
||||
/// Pre-built prefilter state. `None` if prefiltering is not applicable.
|
||||
prefilter_builder: Option<PrefilterContextBuilder>,
|
||||
}
|
||||
|
||||
/// Context passed to [RowGroupReaderBuilder::build()] carrying all information
|
||||
/// needed for prefiltering decisions.
|
||||
pub(crate) struct RowGroupBuildContext<'a> {
|
||||
/// Simple filters pushed down. Used by prefilter on other columns.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) filters: &'a [SimpleFilterContext],
|
||||
/// Whether to skip field filters. Used by prefilter on other columns.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) skip_fields: bool,
|
||||
/// Index of the row group to read.
|
||||
pub(crate) row_group_idx: usize,
|
||||
/// Row selection for the row group. `None` means all rows.
|
||||
pub(crate) row_selection: Option<RowSelection>,
|
||||
/// Metrics for tracking fetch operations.
|
||||
pub(crate) fetch_metrics: Option<&'a ParquetFetchMetrics>,
|
||||
}
|
||||
|
||||
impl RowGroupReaderBuilder {
|
||||
@@ -1679,11 +1718,58 @@ impl RowGroupReaderBuilder {
|
||||
&self.cache_strategy
|
||||
}
|
||||
|
||||
pub(crate) fn has_flat_primary_key_prefilter(&self) -> bool {
|
||||
self.prefilter_builder.is_some()
|
||||
}
|
||||
|
||||
/// Builds a [ParquetRecordBatchStream] to read the row group at `row_group_idx`.
|
||||
///
|
||||
/// If prefiltering is applicable (based on `build_ctx`), this performs a two-phase read:
|
||||
/// 1. Reads only the prefilter columns (e.g. PK column), applies filters to get a refined row selection
|
||||
/// 2. Reads the full projection with the refined row selection
|
||||
pub(crate) async fn build(
|
||||
&self,
|
||||
build_ctx: RowGroupBuildContext<'_>,
|
||||
) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
|
||||
let prefilter_ctx = self.prefilter_builder.as_ref().map(|b| b.build());
|
||||
|
||||
let Some(mut prefilter_ctx) = prefilter_ctx else {
|
||||
// No prefilter applicable, build stream with full projection.
|
||||
return self
|
||||
.build_with_projection(
|
||||
build_ctx.row_group_idx,
|
||||
build_ctx.row_selection,
|
||||
self.projection.clone(),
|
||||
build_ctx.fetch_metrics,
|
||||
)
|
||||
.await;
|
||||
};
|
||||
|
||||
let prefilter_start = Instant::now();
|
||||
let prefilter_result = execute_prefilter(&mut prefilter_ctx, self, &build_ctx).await?;
|
||||
if let Some(metrics) = build_ctx.fetch_metrics {
|
||||
let mut data = metrics.data.lock().unwrap();
|
||||
data.prefilter_cost += prefilter_start.elapsed();
|
||||
data.prefilter_filtered_rows += prefilter_result.filtered_rows;
|
||||
}
|
||||
|
||||
let refined_selection = Some(prefilter_result.refined_selection);
|
||||
|
||||
self.build_with_projection(
|
||||
build_ctx.row_group_idx,
|
||||
refined_selection,
|
||||
self.projection.clone(),
|
||||
build_ctx.fetch_metrics,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Builds a [ParquetRecordBatchStream] with a custom projection mask.
|
||||
pub(crate) async fn build_with_projection(
|
||||
&self,
|
||||
row_group_idx: usize,
|
||||
row_selection: Option<RowSelection>,
|
||||
projection: ProjectionMask,
|
||||
fetch_metrics: Option<&ParquetFetchMetrics>,
|
||||
) -> Result<ParquetRecordBatchStream<SstAsyncFileReader>> {
|
||||
// Create async file reader with caching support.
|
||||
@@ -1704,7 +1790,7 @@ impl RowGroupReaderBuilder {
|
||||
);
|
||||
builder = builder
|
||||
.with_row_groups(vec![row_group_idx])
|
||||
.with_projection(self.projection.clone())
|
||||
.with_projection(projection)
|
||||
.with_batch_size(DEFAULT_READ_BATCH_SIZE);
|
||||
|
||||
if let Some(selection) = row_selection {
|
||||
@@ -1739,6 +1825,8 @@ pub(crate) struct SimpleFilterContext {
|
||||
semantic_type: SemanticType,
|
||||
/// The data type of the column.
|
||||
data_type: ConcreteDataType,
|
||||
/// Whether this filter can be applied by flat parquet primary-key prefiltering.
|
||||
usable_primary_key_filter: bool,
|
||||
}
|
||||
|
||||
impl SimpleFilterContext {
|
||||
@@ -1752,6 +1840,10 @@ impl SimpleFilterContext {
|
||||
expr: &Expr,
|
||||
) -> Option<Self> {
|
||||
let filter = SimpleFilterEvaluator::try_new(expr)?;
|
||||
// Parquet PK prefilter always supports the partition column. Only
|
||||
// PartitionTreeMemtable skips it after partition pruning.
|
||||
let usable_primary_key_filter =
|
||||
is_usable_primary_key_filter(sst_meta, expected_meta, &filter);
|
||||
let (column_metadata, maybe_filter) = match expected_meta {
|
||||
Some(meta) => {
|
||||
// Gets the column metadata from the expected metadata.
|
||||
@@ -1782,11 +1874,15 @@ impl SimpleFilterContext {
|
||||
}
|
||||
};
|
||||
|
||||
let usable_primary_key_filter =
|
||||
matches!(maybe_filter, MaybeFilter::Filter(_)) && usable_primary_key_filter;
|
||||
|
||||
Some(Self {
|
||||
filter: maybe_filter,
|
||||
column_id: column_metadata.column_id,
|
||||
semantic_type: column_metadata.semantic_type,
|
||||
data_type: column_metadata.column_schema.data_type.clone(),
|
||||
usable_primary_key_filter,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1809,6 +1905,23 @@ impl SimpleFilterContext {
|
||||
pub(crate) fn data_type(&self) -> &ConcreteDataType {
|
||||
&self.data_type
|
||||
}
|
||||
|
||||
/// Returns whether this filter is eligible for flat parquet PK prefiltering.
|
||||
pub(crate) fn usable_primary_key_filter(&self) -> bool {
|
||||
self.usable_primary_key_filter
|
||||
}
|
||||
|
||||
/// Returns the filter evaluator when it is eligible for PK prefiltering.
|
||||
pub(crate) fn primary_key_prefilter(&self) -> Option<SimpleFilterEvaluator> {
|
||||
if !self.usable_primary_key_filter {
|
||||
return None;
|
||||
}
|
||||
|
||||
match &self.filter {
|
||||
MaybeFilter::Filter(filter) => Some(filter.clone()),
|
||||
MaybeFilter::Matched | MaybeFilter::Pruned => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Prune a column by its default value.
|
||||
@@ -1856,17 +1969,17 @@ impl ParquetReader {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let skip_fields = self.context.should_skip_fields(row_group_idx);
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder()
|
||||
.build(
|
||||
.build(self.context.build_context(
|
||||
row_group_idx,
|
||||
Some(row_selection),
|
||||
Some(&self.fetch_metrics),
|
||||
)
|
||||
skip_fields,
|
||||
))
|
||||
.await?;
|
||||
|
||||
let skip_fields = self.context.should_skip_fields(row_group_idx);
|
||||
self.reader = Some(FlatPruneReader::new_with_row_group_reader(
|
||||
self.context.clone(),
|
||||
FlatRowGroupReader::new(self.context.clone(), parquet_reader),
|
||||
@@ -1889,11 +2002,16 @@ impl ParquetReader {
|
||||
debug_assert!(context.read_format().as_flat().is_some());
|
||||
let fetch_metrics = ParquetFetchMetrics::default();
|
||||
let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
|
||||
let skip_fields = context.should_skip_fields(row_group_idx);
|
||||
let parquet_reader = context
|
||||
.reader_builder()
|
||||
.build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
|
||||
.build(context.build_context(
|
||||
row_group_idx,
|
||||
Some(row_selection),
|
||||
Some(&fetch_metrics),
|
||||
skip_fields,
|
||||
))
|
||||
.await?;
|
||||
let skip_fields = context.should_skip_fields(row_group_idx);
|
||||
Some(FlatPruneReader::new_with_row_group_reader(
|
||||
context.clone(),
|
||||
FlatRowGroupReader::new(context.clone(), parquet_reader),
|
||||
@@ -2111,11 +2229,15 @@ mod tests {
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
use datafusion_expr::{
|
||||
ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
|
||||
col, lit,
|
||||
};
|
||||
use datatypes::arrow::array::{ArrayRef, Int64Array};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use object_store::services::Memory;
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::region_request::PathType;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
@@ -2207,4 +2329,80 @@ mod tests {
|
||||
|
||||
assert!(!selection.is_empty());
|
||||
}
|
||||
|
||||
fn expected_metadata_with_reused_tag_name(
|
||||
old_metadata: &RegionMetadata,
|
||||
) -> Arc<RegionMetadata> {
|
||||
let mut builder = RegionMetadataBuilder::new(old_metadata.region_id);
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_0".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 10,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_1".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"field_0".to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 2,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 3,
|
||||
})
|
||||
.primary_key(vec![10, 1]);
|
||||
|
||||
Arc::new(builder.build().unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_filter_context_marks_usable_primary_key_filter() {
|
||||
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
|
||||
let ctx =
|
||||
SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap();
|
||||
|
||||
assert!(ctx.usable_primary_key_filter());
|
||||
assert!(ctx.primary_key_prefilter().is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_filter_context_skips_non_usable_primary_key_filter() {
|
||||
let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
|
||||
|
||||
let field_ctx =
|
||||
SimpleFilterContext::new_opt(&metadata, None, &col("field_0").eq(lit(1_u64))).unwrap();
|
||||
assert!(!field_ctx.usable_primary_key_filter());
|
||||
assert!(field_ctx.primary_key_prefilter().is_none());
|
||||
|
||||
let expected_metadata = expected_metadata_with_reused_tag_name(metadata.as_ref());
|
||||
let mismatched_ctx = SimpleFilterContext::new_opt(
|
||||
&metadata,
|
||||
Some(expected_metadata.as_ref()),
|
||||
&col("tag_0").eq(lit("a")),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(!mismatched_ctx.usable_primary_key_filter());
|
||||
assert!(mismatched_ctx.primary_key_prefilter().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,12 +48,16 @@ pub struct ParquetFetchMetricsData {
|
||||
pub store_fetch_elapsed: std::time::Duration,
|
||||
/// Total elapsed time for fetching row groups.
|
||||
pub total_fetch_elapsed: std::time::Duration,
|
||||
/// Elapsed time for prefilter execution.
|
||||
pub prefilter_cost: std::time::Duration,
|
||||
/// Number of rows filtered out by prefiltering.
|
||||
pub prefilter_filtered_rows: usize,
|
||||
}
|
||||
|
||||
impl ParquetFetchMetricsData {
|
||||
/// Returns true if the metrics are empty (contain no meaningful data).
|
||||
fn is_empty(&self) -> bool {
|
||||
self.total_fetch_elapsed.is_zero()
|
||||
self.total_fetch_elapsed.is_zero() && self.prefilter_cost.is_zero()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,6 +88,8 @@ impl std::fmt::Debug for ParquetFetchMetrics {
|
||||
write_cache_fetch_elapsed,
|
||||
store_fetch_elapsed,
|
||||
total_fetch_elapsed,
|
||||
prefilter_cost,
|
||||
prefilter_filtered_rows,
|
||||
} = *data;
|
||||
|
||||
write!(f, "{{")?;
|
||||
@@ -142,6 +148,16 @@ impl std::fmt::Debug for ParquetFetchMetrics {
|
||||
if !store_fetch_elapsed.is_zero() {
|
||||
write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?;
|
||||
}
|
||||
if !prefilter_cost.is_zero() {
|
||||
write!(f, ", \"prefilter_cost\":\"{:?}\"", prefilter_cost)?;
|
||||
}
|
||||
if prefilter_filtered_rows > 0 {
|
||||
write!(
|
||||
f,
|
||||
", \"prefilter_filtered_rows\":{}",
|
||||
prefilter_filtered_rows
|
||||
)?;
|
||||
}
|
||||
|
||||
write!(f, "}}")
|
||||
}
|
||||
@@ -169,6 +185,8 @@ impl ParquetFetchMetrics {
|
||||
write_cache_fetch_elapsed,
|
||||
store_fetch_elapsed,
|
||||
total_fetch_elapsed,
|
||||
prefilter_cost,
|
||||
prefilter_filtered_rows,
|
||||
} = *other.data.lock().unwrap();
|
||||
|
||||
let mut data = self.data.lock().unwrap();
|
||||
@@ -185,6 +203,8 @@ impl ParquetFetchMetrics {
|
||||
data.write_cache_fetch_elapsed += write_cache_fetch_elapsed;
|
||||
data.store_fetch_elapsed += store_fetch_elapsed;
|
||||
data.total_fetch_elapsed += total_fetch_elapsed;
|
||||
data.prefilter_cost += prefilter_cost;
|
||||
data.prefilter_filtered_rows += prefilter_filtered_rows;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -554,11 +554,43 @@ fn intersect_row_selections(left: &RowSelection, right: &RowSelection) -> RowSel
|
||||
/// or if there's a gap that requires skipping rows. It handles both "select" and "skip" actions,
|
||||
/// optimizing the list of selectors by merging contiguous actions of the same type.
|
||||
///
|
||||
/// The returned selection intentionally stops at the end of the last matched range and may omit a
|
||||
/// trailing `skip` that would extend it to `total_row_count`. That is fine when the selection is
|
||||
/// used directly by the parquet reader, which simply stops once the selectors are exhausted.
|
||||
///
|
||||
/// Note: overlapping ranges are not supported and will result in an incorrect selection.
|
||||
pub(crate) fn row_selection_from_row_ranges(
|
||||
row_ranges: impl Iterator<Item = Range<usize>>,
|
||||
total_row_count: usize,
|
||||
) -> RowSelection {
|
||||
let (selectors, _) = build_selectors_from_row_ranges(row_ranges, total_row_count);
|
||||
RowSelection::from(selectors)
|
||||
}
|
||||
|
||||
/// Like [`row_selection_from_row_ranges`] but guarantees the resulting selection
|
||||
/// covers exactly `total_row_count` rows by appending a trailing skip if needed.
|
||||
///
|
||||
/// Required when the result is used as the inner operand of [`RowSelection::and_then`], because
|
||||
/// `and_then` expects the inner selection to account for every row selected by the outer one.
|
||||
pub(crate) fn row_selection_from_row_ranges_exact(
|
||||
row_ranges: impl Iterator<Item = Range<usize>>,
|
||||
total_row_count: usize,
|
||||
) -> RowSelection {
|
||||
let (mut selectors, last_processed_end) =
|
||||
build_selectors_from_row_ranges(row_ranges, total_row_count);
|
||||
if last_processed_end < total_row_count {
|
||||
// Preserve the full logical length of the selection even when the final rows are all
|
||||
// filtered out. Without this trailing skip, `and_then` sees an undersized inner
|
||||
// selection and panics.
|
||||
add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
|
||||
}
|
||||
RowSelection::from(selectors)
|
||||
}
|
||||
|
||||
fn build_selectors_from_row_ranges(
|
||||
row_ranges: impl Iterator<Item = Range<usize>>,
|
||||
total_row_count: usize,
|
||||
) -> (Vec<RowSelector>, usize) {
|
||||
let mut selectors: Vec<RowSelector> = Vec::new();
|
||||
let mut last_processed_end = 0;
|
||||
|
||||
@@ -572,7 +604,7 @@ pub(crate) fn row_selection_from_row_ranges(
|
||||
last_processed_end = end;
|
||||
}
|
||||
|
||||
RowSelection::from(selectors)
|
||||
(selectors, last_processed_end)
|
||||
}
|
||||
|
||||
/// Converts an iterator of sorted row IDs into a `RowSelection`.
|
||||
@@ -707,6 +739,56 @@ mod tests {
|
||||
assert_eq!(selection, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exact_single_range_with_trailing_skip() {
|
||||
let selection = row_selection_from_row_ranges_exact(Some(0..3).into_iter(), 6);
|
||||
let expected = RowSelection::from(vec![RowSelector::select(3), RowSelector::skip(3)]);
|
||||
assert_eq!(selection, expected);
|
||||
assert_eq!(selection.row_count(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exact_non_contiguous_ranges() {
|
||||
let ranges = [1..3, 5..8];
|
||||
let selection = row_selection_from_row_ranges_exact(ranges.iter().cloned(), 10);
|
||||
let expected = RowSelection::from(vec![
|
||||
RowSelector::skip(1),
|
||||
RowSelector::select(2),
|
||||
RowSelector::skip(2),
|
||||
RowSelector::select(3),
|
||||
RowSelector::skip(2),
|
||||
]);
|
||||
assert_eq!(selection, expected);
|
||||
assert_eq!(selection.row_count(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exact_empty_ranges() {
|
||||
let selection = row_selection_from_row_ranges_exact([].iter().cloned(), 10);
|
||||
let expected = RowSelection::from(vec![RowSelector::skip(10)]);
|
||||
assert_eq!(selection, expected);
|
||||
assert_eq!(selection.row_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exact_range_covers_all_rows() {
|
||||
let selection = row_selection_from_row_ranges_exact(Some(0..10).into_iter(), 10);
|
||||
let expected = RowSelection::from(vec![RowSelector::select(10)]);
|
||||
assert_eq!(selection, expected);
|
||||
assert_eq!(selection.row_count(), 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exact_compatible_with_and_then() {
|
||||
// Outer selects rows 0..6 out of 10.
|
||||
let outer = RowSelection::from(vec![RowSelector::select(6), RowSelector::skip(4)]);
|
||||
// Inner: within those 6 rows, select only rows 0..3.
|
||||
let inner = row_selection_from_row_ranges_exact(Some(0..3).into_iter(), 6);
|
||||
let result = outer.and_then(&inner);
|
||||
let expected = RowSelection::from(vec![RowSelector::select(3), RowSelector::skip(7)]);
|
||||
assert_eq!(result, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_row_ids_to_selection() {
|
||||
let row_ids = [1, 3, 5, 7, 9].into_iter();
|
||||
|
||||
Reference in New Issue
Block a user