feat: precise filter for mito parquet reader (#3178)

* impl SimpleFilterEvaluator

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* time index and field filter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finish parquet filter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove empty Batch

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix fmt

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update metric

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use projected schema from batch

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* correct naming

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unnecessary error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-01-18 14:59:48 +08:00
committed by GitHub
parent 63205907fb
commit cde5a36f5e
17 changed files with 652 additions and 32 deletions

1
Cargo.lock generated
View File

@@ -1916,6 +1916,7 @@ name = "common-recordbatch"
version = "0.6.0"
dependencies = [
"arc-swap",
"common-base",
"common-error",
"common-macro",
"datafusion",

View File

@@ -6,6 +6,7 @@ license.workspace = true
[dependencies]
arc-swap = "1.6"
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
datafusion-common.workspace = true

View File

@@ -107,6 +107,16 @@ pub enum Error {
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Error occurs when performing arrow computation"))]
ArrowCompute {
#[snafu(source)]
error: datatypes::arrow::error::ArrowError,
location: Location,
},
#[snafu(display("Unsupported operation: {}", reason))]
UnsupportedOperation { reason: String, location: Location },
}
impl ErrorExt for Error {
@@ -120,10 +130,13 @@ impl ErrorExt for Error {
| Error::Format { .. }
| Error::InitRecordbatchStream { .. }
| Error::ColumnNotExists { .. }
| Error::ProjectArrowRecordBatch { .. } => StatusCode::Internal,
| Error::ProjectArrowRecordBatch { .. }
| Error::ArrowCompute { .. } => StatusCode::Internal,
Error::External { source, .. } => source.status_code(),
Error::UnsupportedOperation { .. } => StatusCode::Unsupported,
Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => {
source.status_code()
}

View File

@@ -0,0 +1,258 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Util record batch stream wrapper that can perform precise filter.
use datafusion::logical_expr::{Expr, Operator};
use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar};
use datafusion_common::arrow::buffer::BooleanBuffer;
use datafusion_common::arrow::compute::kernels::cmp;
use datafusion_common::ScalarValue;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;
use crate::error::{ArrowComputeSnafu, Result, UnsupportedOperationSnafu};
/// An inplace expr evaluator for simple filter. Only support
/// - `col` `op` `literal`
/// - `literal` `op` `col`
///
/// And the `op` is one of `=`, `!=`, `>`, `>=`, `<`, `<=`.
///
/// This struct contains normalized predicate expr. In the form of
/// `col` `op` `literal` where the `col` is provided from input.
#[derive(Debug)]
pub struct SimpleFilterEvaluator {
/// Name of the referenced column.
column_name: String,
/// The literal value.
literal: Scalar<ArrayRef>,
/// The operator.
op: Operator,
}
impl SimpleFilterEvaluator {
pub fn try_new(predicate: &Expr) -> Option<Self> {
match predicate {
Expr::BinaryExpr(binary) => {
// check if the expr is in the supported form
match binary.op {
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq => {}
_ => return None,
}
// swap the expr if it is in the form of `literal` `op` `col`
let mut op = binary.op;
let (lhs, rhs) = match (&*binary.left, &*binary.right) {
(Expr::Column(ref col), Expr::Literal(ref lit)) => (col, lit),
(Expr::Literal(ref lit), Expr::Column(ref col)) => {
// safety: The previous check ensures the operator is able to swap.
op = op.swap().unwrap();
(col, lit)
}
_ => return None,
};
Some(Self {
column_name: lhs.name.clone(),
literal: rhs.clone().to_scalar(),
op,
})
}
_ => None,
}
}
/// Get the name of the referenced column.
pub fn column_name(&self) -> &str {
&self.column_name
}
pub fn evaluate_scalar(&self, input: &ScalarValue) -> Result<bool> {
let result = self.evaluate_datum(&input.to_scalar())?;
Ok(result.value(0))
}
pub fn evaluate_array(&self, input: &ArrayRef) -> Result<BooleanBuffer> {
self.evaluate_datum(input)
}
pub fn evaluate_vector(&self, input: &VectorRef) -> Result<BooleanBuffer> {
self.evaluate_datum(&input.to_arrow_array())
}
fn evaluate_datum(&self, input: &impl Datum) -> Result<BooleanBuffer> {
let result = match self.op {
Operator::Eq => cmp::eq(input, &self.literal),
Operator::NotEq => cmp::neq(input, &self.literal),
Operator::Lt => cmp::lt(input, &self.literal),
Operator::LtEq => cmp::lt_eq(input, &self.literal),
Operator::Gt => cmp::gt(input, &self.literal),
Operator::GtEq => cmp::gt_eq(input, &self.literal),
_ => {
return UnsupportedOperationSnafu {
reason: format!("{:?}", self.op),
}
.fail()
}
};
result
.context(ArrowComputeSnafu)
.map(|array| array.values().clone())
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion::logical_expr::BinaryExpr;
use datafusion_common::Column;
use super::*;
#[test]
fn unsupported_filter_op() {
// `+` is not supported
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
})),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
});
assert!(SimpleFilterEvaluator::try_new(&expr).is_none());
// two literal is not supported
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
});
assert!(SimpleFilterEvaluator::try_new(&expr).is_none());
// two column is not supported
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
})),
op: Operator::Eq,
right: Box::new(Expr::Column(Column {
relation: None,
name: "bar".to_string(),
})),
});
assert!(SimpleFilterEvaluator::try_new(&expr).is_none());
// compound expr is not supported
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
});
assert!(SimpleFilterEvaluator::try_new(&expr).is_none());
}
#[test]
fn supported_filter_op() {
// equal
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
});
let _ = SimpleFilterEvaluator::try_new(&expr).unwrap();
// swap operands
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
op: Operator::Lt,
right: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
})),
});
let evaluator = SimpleFilterEvaluator::try_new(&expr).unwrap();
assert_eq!(evaluator.op, Operator::Gt);
assert_eq!(evaluator.column_name, "foo".to_string());
}
#[test]
fn run_on_array() {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
});
let evaluator = SimpleFilterEvaluator::try_new(&expr).unwrap();
let input_1 = Arc::new(datatypes::arrow::array::Int64Array::from(vec![1, 2, 3])) as _;
let result = evaluator.evaluate_array(&input_1).unwrap();
assert_eq!(result, BooleanBuffer::from(vec![true, false, false]));
let input_2 = Arc::new(datatypes::arrow::array::Int64Array::from(vec![1, 1, 1])) as _;
let result = evaluator.evaluate_array(&input_2).unwrap();
assert_eq!(result, BooleanBuffer::from(vec![true, true, true]));
let input_3 = Arc::new(datatypes::arrow::array::Int64Array::new_null(0)) as _;
let result = evaluator.evaluate_array(&input_3).unwrap();
assert_eq!(result, BooleanBuffer::from(vec![]));
}
#[test]
fn run_on_scalar() {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "foo".to_string(),
})),
op: Operator::Lt,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
});
let evaluator = SimpleFilterEvaluator::try_new(&expr).unwrap();
let input_1 = ScalarValue::Int64(Some(1));
let result = evaluator.evaluate_scalar(&input_1).unwrap();
assert!(!result);
let input_2 = ScalarValue::Int64(Some(0));
let result = evaluator.evaluate_scalar(&input_2).unwrap();
assert!(result);
let input_3 = ScalarValue::Int64(None);
let result = evaluator.evaluate_scalar(&input_3).unwrap();
assert!(!result);
}
}

View File

@@ -14,6 +14,7 @@
pub mod adapter;
pub mod error;
pub mod filter;
mod recordbatch;
pub mod util;

View File

@@ -76,16 +76,11 @@ async fn test_read_parquet_stats() {
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 10 | 10.0 | 1970-01-01T00:00:10 |
| 11 | 11.0 | 1970-01-01T00:00:11 |
| 12 | 12.0 | 1970-01-01T00:00:12 |
| 13 | 13.0 | 1970-01-01T00:00:13 |
| 14 | 14.0 | 1970-01-01T00:00:14 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |

View File

@@ -536,6 +536,12 @@ pub enum Error {
error: std::io::Error,
location: Location,
},
#[snafu(display("Failed to filter record batch"))]
FilterRecordBatch {
source: common_recordbatch::error::Error,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -632,6 +638,7 @@ impl ErrorExt for Error {
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,
FilterRecordBatch { source, .. } => source.status_code(),
Upload { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -124,6 +124,9 @@ lazy_static! {
/// Counter of row groups read.
pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap();
/// Counter of filtered rows by precise filter.
pub static ref PRECISE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap();
// ------- End of query metrics.
// Cache related metrics.

View File

@@ -33,7 +33,7 @@ use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::row::{RowConverter, SortField};
use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
use datatypes::types::TimestampType;
use datatypes::value::ValueRef;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{
BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
@@ -58,6 +58,8 @@ use crate::memtable::BoxedBatchIterator;
pub struct Batch {
/// Primary key encoded in a comparable form.
primary_key: Vec<u8>,
/// Possibly decoded `primary_key` values. Some places would decode it in advance.
pk_values: Option<Vec<Value>>,
/// Timestamps of rows, should be sorted and not null.
timestamps: VectorRef,
/// Sequences of rows
@@ -104,6 +106,22 @@ impl Batch {
&self.primary_key
}
/// Returns possibly decoded primary-key values.
pub fn pk_values(&self) -> Option<&[Value]> {
self.pk_values.as_deref()
}
/// Sets possibly decoded primary-key values.
pub fn set_pk_values(&mut self, pk_values: Vec<Value>) {
self.pk_values = Some(pk_values);
}
/// Removes possibly decoded primary-key values. For testing only.
#[cfg(any(test, feature = "test"))]
pub fn remove_pk_values(&mut self) {
self.pk_values = None;
}
/// Returns fields in the batch.
pub fn fields(&self) -> &[BatchColumn] {
&self.fields
@@ -195,6 +213,7 @@ impl Batch {
// Now we need to clone the primary key. We could try `Bytes` if
// this becomes a bottleneck.
primary_key: self.primary_key.clone(),
pk_values: self.pk_values.clone(),
timestamps: self.timestamps.slice(offset, length),
sequences: Arc::new(self.sequences.get_slice(offset, length)),
op_types: Arc::new(self.op_types.get_slice(offset, length)),
@@ -653,6 +672,7 @@ impl BatchBuilder {
Ok(Batch {
primary_key: self.primary_key,
pk_values: None,
timestamps,
sequences,
op_types,

View File

@@ -177,10 +177,14 @@ impl ProjectionMapper {
// Skips decoding pk if we don't need to output it.
let pk_values = if self.has_tags {
self.codec
.decode(batch.primary_key())
.map_err(BoxedError::new)
.context(ExternalSnafu)?
match batch.pk_values() {
Some(v) => v.to_vec(),
None => self
.codec
.decode(batch.primary_key())
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
}
} else {
Vec::new()
};

View File

@@ -77,6 +77,9 @@ mod tests {
use std::sync::Arc;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use table::predicate::Predicate;
use super::*;
use crate::cache::{CacheManager, PageKey};
@@ -283,4 +286,140 @@ mod tests {
offset_index,
);
}
#[tokio::test]
async fn test_read_with_tag_filter() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
// Prepare data.
let mut writer = ParquetWriter::new(
file_path,
metadata.clone(),
object_store.clone(),
Indexer::default(),
);
writer
.write_all(source, &write_opts)
.await
.unwrap()
.unwrap();
// Predicate
let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "tag_0".to_string(),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
})
.into()]));
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
.predicate(predicate);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 50),
new_batch_by_range(&["a", "d"], 50, 60),
],
)
.await;
}
#[tokio::test]
async fn test_read_empty_batch() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "z"], 0, 0),
new_batch_by_range(&["a", "z"], 100, 100),
new_batch_by_range(&["a", "z"], 200, 230),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
// Prepare data.
let mut writer = ParquetWriter::new(
file_path,
metadata.clone(),
object_store.clone(),
Indexer::default(),
);
writer
.write_all(source, &write_opts)
.await
.unwrap()
.unwrap();
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
}
#[tokio::test]
async fn test_read_with_field_filter() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
// Prepare data.
let mut writer = ParquetWriter::new(
file_path,
metadata.clone(),
object_store.clone(),
Indexer::default(),
);
writer
.write_all(source, &write_opts)
.await
.unwrap()
.unwrap();
// Predicate
let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column {
relation: None,
name: "field_0".to_string(),
})),
op: Operator::GtEq,
right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
})
.into()]));
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
.predicate(predicate);
let mut reader = builder.build().await.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
}
}

View File

@@ -118,8 +118,14 @@ pub(crate) struct ReadFormat {
metadata: RegionMetadataRef,
/// SST file schema.
arrow_schema: SchemaRef,
// Field column id to its index in `schema` (SST schema).
/// Field column id to its index in `schema` (SST schema).
/// In SST schema, fields are stored in the front of the schema.
field_id_to_index: HashMap<ColumnId, usize>,
/// Field column id to their index in the projected schema (
/// the schema of [Batch]).
///
/// This field is set at the first call to [convert_record_batch](Self::convert_record_batch).
field_id_to_projected_index: Option<HashMap<ColumnId, usize>>,
}
impl ReadFormat {
@@ -136,6 +142,7 @@ impl ReadFormat {
metadata,
arrow_schema,
field_id_to_index,
field_id_to_projected_index: None,
}
}
@@ -180,7 +187,7 @@ impl ReadFormat {
///
/// Note that the `record_batch` may only contains a subset of columns if it is projected.
pub(crate) fn convert_record_batch(
&self,
&mut self,
record_batch: &RecordBatch,
batches: &mut VecDeque<Batch>,
) -> Result<()> {
@@ -197,6 +204,10 @@ impl ReadFormat {
}
);
if self.field_id_to_projected_index.is_none() {
self.init_id_to_projected_index(record_batch);
}
let mut fixed_pos_columns = record_batch
.columns()
.iter()
@@ -259,6 +270,19 @@ impl ReadFormat {
Ok(())
}
fn init_id_to_projected_index(&mut self, record_batch: &RecordBatch) {
let mut name_to_projected_index = HashMap::new();
for (index, field) in record_batch.schema().fields().iter().enumerate() {
let Some(column) = self.metadata.column_by_name(field.name()) else {
continue;
};
if column.semantic_type == SemanticType::Field {
name_to_projected_index.insert(column.column_id, index);
}
}
self.field_id_to_projected_index = Some(name_to_projected_index);
}
/// Returns min values of specific column in row groups.
pub(crate) fn min_values(
&self,
@@ -478,15 +502,25 @@ impl ReadFormat {
Some(Arc::new(UInt64Array::from_iter(values)))
}
/// Field index of the primary key.
/// Index in SST of the primary key.
fn primary_key_position(&self) -> usize {
self.arrow_schema.fields.len() - 3
}
/// Field index of the time index.
/// Index in SST of the time index.
fn time_index_position(&self) -> usize {
self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
}
/// Index of a field column by its column id.
/// This function is only available after the first call to
/// [convert_record_batch](Self::convert_record_batch). Otherwise
/// it always return `None`
pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
self.field_id_to_projected_index
.as_ref()
.and_then(|m| m.get(&column_id).copied())
}
}
/// Gets the arrow schema to store in parquet.
@@ -771,7 +805,7 @@ mod tests {
fn test_convert_empty_record_batch() {
let metadata = build_test_region_metadata();
let arrow_schema = build_test_arrow_schema();
let read_format = ReadFormat::new(metadata);
let mut read_format = ReadFormat::new(metadata);
assert_eq!(arrow_schema, *read_format.arrow_schema());
let record_batch = RecordBatch::new_empty(arrow_schema);
@@ -785,7 +819,7 @@ mod tests {
#[test]
fn test_convert_record_batch() {
let metadata = build_test_region_metadata();
let read_format = ReadFormat::new(metadata);
let mut read_format = ReadFormat::new(metadata);
let columns: Vec<ArrayRef> = vec![
Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1

View File

@@ -15,12 +15,17 @@
//! Parquet reader.
use std::collections::{BTreeSet, VecDeque};
use std::ops::BitAnd;
use std::sync::Arc;
use std::time::{Duration, Instant};
use api::v1::SemanticType;
use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use datafusion_common::arrow::array::BooleanArray;
use datafusion_common::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
@@ -36,11 +41,14 @@ use tokio::io::BufReader;
use crate::cache::CacheManagerRef;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu,
Result,
ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu,
InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
};
use crate::metrics::{READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::format::ReadFormat;
@@ -180,12 +188,31 @@ impl ParquetReaderBuilder {
..Default::default()
};
let predicate = if let Some(p) = &self.predicate {
p.exprs()
.iter()
.filter_map(|expr| SimpleFilterEvaluator::try_new(expr.df_expr()))
.collect()
} else {
vec![]
};
let codec = McmpRowCodec::new(
read_format
.metadata()
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
Ok(ParquetReader {
row_groups,
read_format,
reader_builder,
predicate,
current_reader: None,
batches: VecDeque::new(),
codec,
metrics,
})
}
@@ -316,6 +343,7 @@ struct Metrics {
num_row_groups_inverted_index_selected: usize,
/// Number of row groups to read after filtering by min-max index.
num_row_groups_min_max_selected: usize,
num_rows_precise_filtered: usize,
/// Duration to build the parquet reader.
build_cost: Duration,
/// Duration to scan the reader.
@@ -400,10 +428,14 @@ pub struct ParquetReader {
/// The builder contains the file handle, so don't drop the builder while using
/// the [ParquetReader].
reader_builder: RowGroupReaderBuilder,
/// Predicate pushed down to this reader.
predicate: Vec<SimpleFilterEvaluator>,
/// Reader of current row group.
current_reader: Option<ParquetRecordBatchReader>,
/// Buffered batches to return.
batches: VecDeque<Batch>,
/// Decoder for primary keys
codec: McmpRowCodec,
/// Local metrics.
metrics: Metrics,
}
@@ -419,16 +451,18 @@ impl BatchReader for ParquetReader {
}
// We need to fetch next record batch and convert it to batches.
let Some(record_batch) = self.fetch_next_record_batch().await? else {
self.metrics.scan_cost += start.elapsed();
return Ok(None);
};
self.metrics.num_record_batches += 1;
self.read_format
.convert_record_batch(&record_batch, &mut self.batches)?;
self.metrics.num_batches += self.batches.len();
while self.batches.is_empty() {
let Some(record_batch) = self.fetch_next_record_batch().await? else {
self.metrics.scan_cost += start.elapsed();
return Ok(None);
};
self.metrics.num_record_batches += 1;
self.read_format
.convert_record_batch(&record_batch, &mut self.batches)?;
self.prune_batches()?;
self.metrics.num_batches += self.batches.len();
}
let batch = self.batches.pop_front();
self.metrics.scan_cost += start.elapsed();
self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
@@ -467,6 +501,9 @@ impl Drop for ParquetReader {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["min_max_index_selected"])
.inc_by(self.metrics.num_row_groups_min_max_selected as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.metrics.num_rows_precise_filtered as u64);
}
}
@@ -515,6 +552,104 @@ impl ParquetReader {
Ok(None)
}
/// Prunes batches by the pushed down predicate.
fn prune_batches(&mut self) -> Result<()> {
// fast path
if self.predicate.is_empty() {
return Ok(());
}
let mut new_batches = VecDeque::new();
let batches = std::mem::take(&mut self.batches);
for batch in batches {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.num_rows_precise_filtered += num_rows_before_filter;
continue;
};
// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.num_rows_precise_filtered += filtered_rows;
if !batch_filtered.is_empty() {
new_batches.push_back(batch_filtered);
}
}
self.batches = new_batches;
Ok(())
}
/// TRY THE BEST to perform pushed down predicate precisely on the input batch.
/// Return the filtered batch. If the entire batch is filtered out, return None.
///
/// Supported filter expr type is defined in [SimpleFilterEvaluator].
///
/// When a filter is referencing primary key column, this method will decode
/// the primary key and put it into the batch.
fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
let mut mask = BooleanBuffer::new_set(input.num_rows());
// Run filter one by one and combine them result
// TODO(ruihang): run primary key filter first. It may short circuit other filters
for filter in &self.predicate {
let column_name = filter.column_name();
let Some(column_metadata) = self.read_format.metadata().column_by_name(column_name)
else {
// column not found, skip
// in situation like an column is added later
continue;
};
let result = match column_metadata.semantic_type {
SemanticType::Tag => {
let pk_values = self.codec.decode(input.primary_key())?;
// Safety: this is a primary key
let pk_index = self
.read_format
.metadata()
.primary_key_index(column_metadata.column_id)
.unwrap();
let pk_value = pk_values[pk_index]
.try_to_scalar_value(&column_metadata.column_schema.data_type)
.context(FieldTypeMismatchSnafu)?;
if filter
.evaluate_scalar(&pk_value)
.context(FilterRecordBatchSnafu)?
{
input.set_pk_values(pk_values);
continue;
} else {
// PK not match means the entire batch is filtered out.
return Ok(None);
}
}
SemanticType::Field => {
let Some(field_index) = self
.read_format
.field_index_by_id(column_metadata.column_id)
else {
continue;
};
let field_col = &input.fields()[field_index].data;
filter
.evaluate_vector(field_col)
.context(FilterRecordBatchSnafu)?
}
SemanticType::Timestamp => filter
.evaluate_vector(input.timestamps())
.context(FilterRecordBatchSnafu)?,
};
mask = mask.bitand(&result);
}
input.filter(&BooleanArray::from(mask).into())?;
Ok(Some(input))
}
#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.reader_builder.parquet_meta.clone()

View File

@@ -538,7 +538,8 @@ pub fn new_batch(
/// Ensure the reader returns batch as `expect`.
pub async fn check_reader_result<R: BatchReader>(reader: &mut R, expect: &[Batch]) {
let mut result = Vec::new();
while let Some(batch) = reader.next_batch().await.unwrap() {
while let Some(mut batch) = reader.next_batch().await.unwrap() {
batch.remove_pk_values();
result.push(batch);
}

View File

@@ -114,7 +114,7 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
}
pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end > start);
assert!(end >= start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];

View File

@@ -254,7 +254,10 @@ impl RegionMetadata {
.map(|id| self.column_by_id(*id).unwrap())
}
/// Returns all field columns.
/// Returns all field columns before projection.
///
/// **Use with caution**. On read path where might have projection, this method
/// can return columns that not present in data batch.
pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
self.column_metadatas
.iter()

View File

@@ -48,6 +48,11 @@ impl Predicate {
Self { exprs }
}
/// Returns the logical exprs.
pub fn exprs(&self) -> &[Expr] {
&self.exprs
}
/// Builds physical exprs according to provided schema.
pub fn to_physical_exprs(
&self,