diff --git a/Cargo.lock b/Cargo.lock index 996469f1db..435e661236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1916,6 +1916,7 @@ name = "common-recordbatch" version = "0.6.0" dependencies = [ "arc-swap", + "common-base", "common-error", "common-macro", "datafusion", diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index ed56a5b5cd..5425313bdb 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] arc-swap = "1.6" +common-base.workspace = true common-error.workspace = true common-macro.workspace = true datafusion-common.workspace = true diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index e5992c37d9..42a2754bb2 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -107,6 +107,16 @@ pub enum Error { location: Location, source: datatypes::error::Error, }, + + #[snafu(display("Error occurs when performing arrow computation"))] + ArrowCompute { + #[snafu(source)] + error: datatypes::arrow::error::ArrowError, + location: Location, + }, + + #[snafu(display("Unsupported operation: {}", reason))] + UnsupportedOperation { reason: String, location: Location }, } impl ErrorExt for Error { @@ -120,10 +130,13 @@ impl ErrorExt for Error { | Error::Format { .. } | Error::InitRecordbatchStream { .. } | Error::ColumnNotExists { .. } - | Error::ProjectArrowRecordBatch { .. } => StatusCode::Internal, + | Error::ProjectArrowRecordBatch { .. } + | Error::ArrowCompute { .. } => StatusCode::Internal, Error::External { source, .. } => source.status_code(), + Error::UnsupportedOperation { .. } => StatusCode::Unsupported, + Error::SchemaConversion { source, .. } | Error::CastVector { source, .. } => { source.status_code() } diff --git a/src/common/recordbatch/src/filter.rs b/src/common/recordbatch/src/filter.rs new file mode 100644 index 0000000000..3175ace37e --- /dev/null +++ b/src/common/recordbatch/src/filter.rs @@ -0,0 +1,258 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Util record batch stream wrapper that can perform precise filter. + +use datafusion::logical_expr::{Expr, Operator}; +use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar}; +use datafusion_common::arrow::buffer::BooleanBuffer; +use datafusion_common::arrow::compute::kernels::cmp; +use datafusion_common::ScalarValue; +use datatypes::vectors::VectorRef; +use snafu::ResultExt; + +use crate::error::{ArrowComputeSnafu, Result, UnsupportedOperationSnafu}; + +/// An inplace expr evaluator for simple filter. Only support +/// - `col` `op` `literal` +/// - `literal` `op` `col` +/// +/// And the `op` is one of `=`, `!=`, `>`, `>=`, `<`, `<=`. +/// +/// This struct contains normalized predicate expr. In the form of +/// `col` `op` `literal` where the `col` is provided from input. +#[derive(Debug)] +pub struct SimpleFilterEvaluator { + /// Name of the referenced column. + column_name: String, + /// The literal value. + literal: Scalar, + /// The operator. + op: Operator, +} + +impl SimpleFilterEvaluator { + pub fn try_new(predicate: &Expr) -> Option { + match predicate { + Expr::BinaryExpr(binary) => { + // check if the expr is in the supported form + match binary.op { + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq => {} + _ => return None, + } + + // swap the expr if it is in the form of `literal` `op` `col` + let mut op = binary.op; + let (lhs, rhs) = match (&*binary.left, &*binary.right) { + (Expr::Column(ref col), Expr::Literal(ref lit)) => (col, lit), + (Expr::Literal(ref lit), Expr::Column(ref col)) => { + // safety: The previous check ensures the operator is able to swap. + op = op.swap().unwrap(); + (col, lit) + } + _ => return None, + }; + + Some(Self { + column_name: lhs.name.clone(), + literal: rhs.clone().to_scalar(), + op, + }) + } + _ => None, + } + } + + /// Get the name of the referenced column. + pub fn column_name(&self) -> &str { + &self.column_name + } + + pub fn evaluate_scalar(&self, input: &ScalarValue) -> Result { + let result = self.evaluate_datum(&input.to_scalar())?; + Ok(result.value(0)) + } + + pub fn evaluate_array(&self, input: &ArrayRef) -> Result { + self.evaluate_datum(input) + } + + pub fn evaluate_vector(&self, input: &VectorRef) -> Result { + self.evaluate_datum(&input.to_arrow_array()) + } + + fn evaluate_datum(&self, input: &impl Datum) -> Result { + let result = match self.op { + Operator::Eq => cmp::eq(input, &self.literal), + Operator::NotEq => cmp::neq(input, &self.literal), + Operator::Lt => cmp::lt(input, &self.literal), + Operator::LtEq => cmp::lt_eq(input, &self.literal), + Operator::Gt => cmp::gt(input, &self.literal), + Operator::GtEq => cmp::gt_eq(input, &self.literal), + _ => { + return UnsupportedOperationSnafu { + reason: format!("{:?}", self.op), + } + .fail() + } + }; + result + .context(ArrowComputeSnafu) + .map(|array| array.values().clone()) + } +} + +#[cfg(test)] +mod test { + + use std::sync::Arc; + + use datafusion::logical_expr::BinaryExpr; + use datafusion_common::Column; + + use super::*; + + #[test] + fn unsupported_filter_op() { + // `+` is not supported + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Plus, + right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + }); + assert!(SimpleFilterEvaluator::try_new(&expr).is_none()); + + // two literal is not supported + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + }); + assert!(SimpleFilterEvaluator::try_new(&expr).is_none()); + + // two column is not supported + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Column(Column { + relation: None, + name: "bar".to_string(), + })), + }); + assert!(SimpleFilterEvaluator::try_new(&expr).is_none()); + + // compound expr is not supported + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + }); + assert!(SimpleFilterEvaluator::try_new(&expr).is_none()); + } + + #[test] + fn supported_filter_op() { + // equal + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + }); + let _ = SimpleFilterEvaluator::try_new(&expr).unwrap(); + + // swap operands + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + op: Operator::Lt, + right: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + }); + let evaluator = SimpleFilterEvaluator::try_new(&expr).unwrap(); + assert_eq!(evaluator.op, Operator::Gt); + assert_eq!(evaluator.column_name, "foo".to_string()); + } + + #[test] + fn run_on_array() { + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + }); + let evaluator = SimpleFilterEvaluator::try_new(&expr).unwrap(); + + let input_1 = Arc::new(datatypes::arrow::array::Int64Array::from(vec![1, 2, 3])) as _; + let result = evaluator.evaluate_array(&input_1).unwrap(); + assert_eq!(result, BooleanBuffer::from(vec![true, false, false])); + + let input_2 = Arc::new(datatypes::arrow::array::Int64Array::from(vec![1, 1, 1])) as _; + let result = evaluator.evaluate_array(&input_2).unwrap(); + assert_eq!(result, BooleanBuffer::from(vec![true, true, true])); + + let input_3 = Arc::new(datatypes::arrow::array::Int64Array::new_null(0)) as _; + let result = evaluator.evaluate_array(&input_3).unwrap(); + assert_eq!(result, BooleanBuffer::from(vec![])); + } + + #[test] + fn run_on_scalar() { + let expr = Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "foo".to_string(), + })), + op: Operator::Lt, + right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))), + }); + let evaluator = SimpleFilterEvaluator::try_new(&expr).unwrap(); + + let input_1 = ScalarValue::Int64(Some(1)); + let result = evaluator.evaluate_scalar(&input_1).unwrap(); + assert!(!result); + + let input_2 = ScalarValue::Int64(Some(0)); + let result = evaluator.evaluate_scalar(&input_2).unwrap(); + assert!(result); + + let input_3 = ScalarValue::Int64(None); + let result = evaluator.evaluate_scalar(&input_3).unwrap(); + assert!(!result); + } +} diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 889046c166..6e87d3f6c2 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -14,6 +14,7 @@ pub mod adapter; pub mod error; +pub mod filter; mod recordbatch; pub mod util; diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 29dde1e8b4..27a66d68c6 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -76,16 +76,11 @@ async fn test_read_parquet_stats() { +-------+---------+---------------------+ | tag_0 | field_0 | ts | +-------+---------+---------------------+ -| 0 | 0.0 | 1970-01-01T00:00:00 | -| 1 | 1.0 | 1970-01-01T00:00:01 | | 10 | 10.0 | 1970-01-01T00:00:10 | | 11 | 11.0 | 1970-01-01T00:00:11 | | 12 | 12.0 | 1970-01-01T00:00:12 | | 13 | 13.0 | 1970-01-01T00:00:13 | | 14 | 14.0 | 1970-01-01T00:00:14 | -| 2 | 2.0 | 1970-01-01T00:00:02 | -| 3 | 3.0 | 1970-01-01T00:00:03 | -| 4 | 4.0 | 1970-01-01T00:00:04 | | 5 | 5.0 | 1970-01-01T00:00:05 | | 6 | 6.0 | 1970-01-01T00:00:06 | | 7 | 7.0 | 1970-01-01T00:00:07 | diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 5e1a983b9a..b84c783866 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -536,6 +536,12 @@ pub enum Error { error: std::io::Error, location: Location, }, + + #[snafu(display("Failed to filter record batch"))] + FilterRecordBatch { + source: common_recordbatch::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -632,6 +638,7 @@ impl ErrorExt for Error { CleanDir { .. } => StatusCode::Unexpected, InvalidConfig { .. } => StatusCode::InvalidArguments, StaleLogEntry { .. } => StatusCode::Unexpected, + FilterRecordBatch { source, .. } => source.status_code(), Upload { .. } => StatusCode::StorageUnavailable, } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 87244d4c31..27e2c39613 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -124,6 +124,9 @@ lazy_static! { /// Counter of row groups read. pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap(); + /// Counter of filtered rows by precise filter. + pub static ref PRECISE_FILTER_ROWS_TOTAL: IntCounterVec = + register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap(); // ------- End of query metrics. // Cache related metrics. diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index c3ce229780..a4b3d8dbe2 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -33,7 +33,7 @@ use datatypes::arrow::compute::SortOptions; use datatypes::arrow::row::{RowConverter, SortField}; use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector}; use datatypes::types::TimestampType; -use datatypes::value::ValueRef; +use datatypes::value::{Value, ValueRef}; use datatypes::vectors::{ BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector, @@ -58,6 +58,8 @@ use crate::memtable::BoxedBatchIterator; pub struct Batch { /// Primary key encoded in a comparable form. primary_key: Vec, + /// Possibly decoded `primary_key` values. Some places would decode it in advance. + pk_values: Option>, /// Timestamps of rows, should be sorted and not null. timestamps: VectorRef, /// Sequences of rows @@ -104,6 +106,22 @@ impl Batch { &self.primary_key } + /// Returns possibly decoded primary-key values. + pub fn pk_values(&self) -> Option<&[Value]> { + self.pk_values.as_deref() + } + + /// Sets possibly decoded primary-key values. + pub fn set_pk_values(&mut self, pk_values: Vec) { + self.pk_values = Some(pk_values); + } + + /// Removes possibly decoded primary-key values. For testing only. + #[cfg(any(test, feature = "test"))] + pub fn remove_pk_values(&mut self) { + self.pk_values = None; + } + /// Returns fields in the batch. pub fn fields(&self) -> &[BatchColumn] { &self.fields @@ -195,6 +213,7 @@ impl Batch { // Now we need to clone the primary key. We could try `Bytes` if // this becomes a bottleneck. primary_key: self.primary_key.clone(), + pk_values: self.pk_values.clone(), timestamps: self.timestamps.slice(offset, length), sequences: Arc::new(self.sequences.get_slice(offset, length)), op_types: Arc::new(self.op_types.get_slice(offset, length)), @@ -653,6 +672,7 @@ impl BatchBuilder { Ok(Batch { primary_key: self.primary_key, + pk_values: None, timestamps, sequences, op_types, diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 5f4fd67edc..c849a2582a 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -177,10 +177,14 @@ impl ProjectionMapper { // Skips decoding pk if we don't need to output it. let pk_values = if self.has_tags { - self.codec - .decode(batch.primary_key()) - .map_err(BoxedError::new) - .context(ExternalSnafu)? + match batch.pk_values() { + Some(v) => v.to_vec(), + None => self + .codec + .decode(batch.primary_key()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?, + } } else { Vec::new() }; diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 42c8eff87f..d0334690cc 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -77,6 +77,9 @@ mod tests { use std::sync::Arc; use common_time::Timestamp; + use datafusion_common::{Column, ScalarValue}; + use datafusion_expr::{BinaryExpr, Expr, Operator}; + use table::predicate::Predicate; use super::*; use crate::cache::{CacheManager, PageKey}; @@ -283,4 +286,140 @@ mod tests { offset_index, ); } + + #[tokio::test] + async fn test_read_with_tag_filter() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + // Prepare data. + let mut writer = ParquetWriter::new( + file_path, + metadata.clone(), + object_store.clone(), + Indexer::default(), + ); + writer + .write_all(source, &write_opts) + .await + .unwrap() + .unwrap(); + + // Predicate + let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "tag_0".to_string(), + })), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))), + }) + .into()])); + + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) + .predicate(predicate); + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_by_range(&["a", "d"], 0, 50), + new_batch_by_range(&["a", "d"], 50, 60), + ], + ) + .await; + } + + #[tokio::test] + async fn test_read_empty_batch() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "z"], 0, 0), + new_batch_by_range(&["a", "z"], 100, 100), + new_batch_by_range(&["a", "z"], 200, 230), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + // Prepare data. + let mut writer = ParquetWriter::new( + file_path, + metadata.clone(), + object_store.clone(), + Indexer::default(), + ); + writer + .write_all(source, &write_opts) + .await + .unwrap() + .unwrap(); + + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let mut reader = builder.build().await.unwrap(); + check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await; + } + + #[tokio::test] + async fn test_read_with_field_filter() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + // Prepare data. + let mut writer = ParquetWriter::new( + file_path, + metadata.clone(), + object_store.clone(), + Indexer::default(), + ); + writer + .write_all(source, &write_opts) + .await + .unwrap() + .unwrap(); + + // Predicate + let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr { + left: Box::new(Expr::Column(Column { + relation: None, + name: "field_0".to_string(), + })), + op: Operator::GtEq, + right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))), + }) + .into()])); + + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) + .predicate(predicate); + let mut reader = builder.build().await.unwrap(); + check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await; + } } diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 46bb003908..dd083047e0 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -118,8 +118,14 @@ pub(crate) struct ReadFormat { metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, - // Field column id to its index in `schema` (SST schema). + /// Field column id to its index in `schema` (SST schema). + /// In SST schema, fields are stored in the front of the schema. field_id_to_index: HashMap, + /// Field column id to their index in the projected schema ( + /// the schema of [Batch]). + /// + /// This field is set at the first call to [convert_record_batch](Self::convert_record_batch). + field_id_to_projected_index: Option>, } impl ReadFormat { @@ -136,6 +142,7 @@ impl ReadFormat { metadata, arrow_schema, field_id_to_index, + field_id_to_projected_index: None, } } @@ -180,7 +187,7 @@ impl ReadFormat { /// /// Note that the `record_batch` may only contains a subset of columns if it is projected. pub(crate) fn convert_record_batch( - &self, + &mut self, record_batch: &RecordBatch, batches: &mut VecDeque, ) -> Result<()> { @@ -197,6 +204,10 @@ impl ReadFormat { } ); + if self.field_id_to_projected_index.is_none() { + self.init_id_to_projected_index(record_batch); + } + let mut fixed_pos_columns = record_batch .columns() .iter() @@ -259,6 +270,19 @@ impl ReadFormat { Ok(()) } + fn init_id_to_projected_index(&mut self, record_batch: &RecordBatch) { + let mut name_to_projected_index = HashMap::new(); + for (index, field) in record_batch.schema().fields().iter().enumerate() { + let Some(column) = self.metadata.column_by_name(field.name()) else { + continue; + }; + if column.semantic_type == SemanticType::Field { + name_to_projected_index.insert(column.column_id, index); + } + } + self.field_id_to_projected_index = Some(name_to_projected_index); + } + /// Returns min values of specific column in row groups. pub(crate) fn min_values( &self, @@ -478,15 +502,25 @@ impl ReadFormat { Some(Arc::new(UInt64Array::from_iter(values))) } - /// Field index of the primary key. + /// Index in SST of the primary key. fn primary_key_position(&self) -> usize { self.arrow_schema.fields.len() - 3 } - /// Field index of the time index. + /// Index in SST of the time index. fn time_index_position(&self) -> usize { self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM } + + /// Index of a field column by its column id. + /// This function is only available after the first call to + /// [convert_record_batch](Self::convert_record_batch). Otherwise + /// it always return `None` + pub fn field_index_by_id(&self, column_id: ColumnId) -> Option { + self.field_id_to_projected_index + .as_ref() + .and_then(|m| m.get(&column_id).copied()) + } } /// Gets the arrow schema to store in parquet. @@ -771,7 +805,7 @@ mod tests { fn test_convert_empty_record_batch() { let metadata = build_test_region_metadata(); let arrow_schema = build_test_arrow_schema(); - let read_format = ReadFormat::new(metadata); + let mut read_format = ReadFormat::new(metadata); assert_eq!(arrow_schema, *read_format.arrow_schema()); let record_batch = RecordBatch::new_empty(arrow_schema); @@ -785,7 +819,7 @@ mod tests { #[test] fn test_convert_record_batch() { let metadata = build_test_region_metadata(); - let read_format = ReadFormat::new(metadata); + let mut read_format = ReadFormat::new(metadata); let columns: Vec = vec![ Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1 diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index e07773d5af..6ba5d63af9 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -15,12 +15,17 @@ //! Parquet reader. use std::collections::{BTreeSet, VecDeque}; +use std::ops::BitAnd; use std::sync::Arc; use std::time::{Duration, Instant}; +use api::v1::SemanticType; use async_trait::async_trait; +use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, warn}; use common_time::range::TimestampRange; +use datafusion_common::arrow::array::BooleanArray; +use datafusion_common::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; @@ -36,11 +41,14 @@ use tokio::io::BufReader; use crate::cache::CacheManagerRef; use crate::error::{ - ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, - Result, + ArrowReaderSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, InvalidMetadataSnafu, + InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, +}; +use crate::metrics::{ + PRECISE_FILTER_ROWS_TOTAL, READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED, }; -use crate::metrics::{READ_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::{Batch, BatchReader}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::format::ReadFormat; @@ -180,12 +188,31 @@ impl ParquetReaderBuilder { ..Default::default() }; + let predicate = if let Some(p) = &self.predicate { + p.exprs() + .iter() + .filter_map(|expr| SimpleFilterEvaluator::try_new(expr.df_expr())) + .collect() + } else { + vec![] + }; + + let codec = McmpRowCodec::new( + read_format + .metadata() + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + Ok(ParquetReader { row_groups, read_format, reader_builder, + predicate, current_reader: None, batches: VecDeque::new(), + codec, metrics, }) } @@ -316,6 +343,7 @@ struct Metrics { num_row_groups_inverted_index_selected: usize, /// Number of row groups to read after filtering by min-max index. num_row_groups_min_max_selected: usize, + num_rows_precise_filtered: usize, /// Duration to build the parquet reader. build_cost: Duration, /// Duration to scan the reader. @@ -400,10 +428,14 @@ pub struct ParquetReader { /// The builder contains the file handle, so don't drop the builder while using /// the [ParquetReader]. reader_builder: RowGroupReaderBuilder, + /// Predicate pushed down to this reader. + predicate: Vec, /// Reader of current row group. current_reader: Option, /// Buffered batches to return. batches: VecDeque, + /// Decoder for primary keys + codec: McmpRowCodec, /// Local metrics. metrics: Metrics, } @@ -419,16 +451,18 @@ impl BatchReader for ParquetReader { } // We need to fetch next record batch and convert it to batches. - let Some(record_batch) = self.fetch_next_record_batch().await? else { - self.metrics.scan_cost += start.elapsed(); - return Ok(None); - }; - self.metrics.num_record_batches += 1; - - self.read_format - .convert_record_batch(&record_batch, &mut self.batches)?; - self.metrics.num_batches += self.batches.len(); + while self.batches.is_empty() { + let Some(record_batch) = self.fetch_next_record_batch().await? else { + self.metrics.scan_cost += start.elapsed(); + return Ok(None); + }; + self.metrics.num_record_batches += 1; + self.read_format + .convert_record_batch(&record_batch, &mut self.batches)?; + self.prune_batches()?; + self.metrics.num_batches += self.batches.len(); + } let batch = self.batches.pop_front(); self.metrics.scan_cost += start.elapsed(); self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); @@ -467,6 +501,9 @@ impl Drop for ParquetReader { READ_ROW_GROUPS_TOTAL .with_label_values(&["min_max_index_selected"]) .inc_by(self.metrics.num_row_groups_min_max_selected as u64); + PRECISE_FILTER_ROWS_TOTAL + .with_label_values(&["parquet"]) + .inc_by(self.metrics.num_rows_precise_filtered as u64); } } @@ -515,6 +552,104 @@ impl ParquetReader { Ok(None) } + /// Prunes batches by the pushed down predicate. + fn prune_batches(&mut self) -> Result<()> { + // fast path + if self.predicate.is_empty() { + return Ok(()); + } + + let mut new_batches = VecDeque::new(); + let batches = std::mem::take(&mut self.batches); + for batch in batches { + let num_rows_before_filter = batch.num_rows(); + let Some(batch_filtered) = self.precise_filter(batch)? else { + // the entire batch is filtered out + self.metrics.num_rows_precise_filtered += num_rows_before_filter; + continue; + }; + + // update metric + let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); + self.metrics.num_rows_precise_filtered += filtered_rows; + + if !batch_filtered.is_empty() { + new_batches.push_back(batch_filtered); + } + } + self.batches = new_batches; + + Ok(()) + } + + /// TRY THE BEST to perform pushed down predicate precisely on the input batch. + /// Return the filtered batch. If the entire batch is filtered out, return None. + /// + /// Supported filter expr type is defined in [SimpleFilterEvaluator]. + /// + /// When a filter is referencing primary key column, this method will decode + /// the primary key and put it into the batch. + fn precise_filter(&self, mut input: Batch) -> Result> { + let mut mask = BooleanBuffer::new_set(input.num_rows()); + + // Run filter one by one and combine them result + // TODO(ruihang): run primary key filter first. It may short circuit other filters + for filter in &self.predicate { + let column_name = filter.column_name(); + let Some(column_metadata) = self.read_format.metadata().column_by_name(column_name) + else { + // column not found, skip + // in situation like an column is added later + continue; + }; + let result = match column_metadata.semantic_type { + SemanticType::Tag => { + let pk_values = self.codec.decode(input.primary_key())?; + // Safety: this is a primary key + let pk_index = self + .read_format + .metadata() + .primary_key_index(column_metadata.column_id) + .unwrap(); + let pk_value = pk_values[pk_index] + .try_to_scalar_value(&column_metadata.column_schema.data_type) + .context(FieldTypeMismatchSnafu)?; + if filter + .evaluate_scalar(&pk_value) + .context(FilterRecordBatchSnafu)? + { + input.set_pk_values(pk_values); + continue; + } else { + // PK not match means the entire batch is filtered out. + return Ok(None); + } + } + SemanticType::Field => { + let Some(field_index) = self + .read_format + .field_index_by_id(column_metadata.column_id) + else { + continue; + }; + let field_col = &input.fields()[field_index].data; + filter + .evaluate_vector(field_col) + .context(FilterRecordBatchSnafu)? + } + SemanticType::Timestamp => filter + .evaluate_vector(input.timestamps()) + .context(FilterRecordBatchSnafu)?, + }; + + mask = mask.bitand(&result); + } + + input.filter(&BooleanArray::from(mask).into())?; + + Ok(Some(input)) + } + #[cfg(test)] pub fn parquet_metadata(&self) -> Arc { self.reader_builder.parquet_meta.clone() diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 63d73c776e..6164861b99 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -538,7 +538,8 @@ pub fn new_batch( /// Ensure the reader returns batch as `expect`. pub async fn check_reader_result(reader: &mut R, expect: &[Batch]) { let mut result = Vec::new(); - while let Some(batch) = reader.next_batch().await.unwrap() { + while let Some(mut batch) = reader.next_batch().await.unwrap() { + batch.remove_pk_values(); result.push(batch); } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 7f4e774175..30a9db1f90 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -114,7 +114,7 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { } pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { - assert!(end > start); + assert!(end >= start); let pk = new_primary_key(tags); let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); let sequences = vec![1000; end - start]; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 86ae565318..0ca9109514 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -254,7 +254,10 @@ impl RegionMetadata { .map(|id| self.column_by_id(*id).unwrap()) } - /// Returns all field columns. + /// Returns all field columns before projection. + /// + /// **Use with caution**. On read path where might have projection, this method + /// can return columns that not present in data batch. pub fn field_columns(&self) -> impl Iterator { self.column_metadatas .iter() diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index a8377d382e..514541f2dd 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -48,6 +48,11 @@ impl Predicate { Self { exprs } } + /// Returns the logical exprs. + pub fn exprs(&self) -> &[Expr] { + &self.exprs + } + /// Builds physical exprs according to provided schema. pub fn to_physical_exprs( &self,