diff --git a/Cargo.lock b/Cargo.lock index dafdb2e3b6..ab6ecd3918 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4897,6 +4897,7 @@ dependencies = [ "bytes", "common-base", "common-error", + "common-query", "common-runtime", "common-telemetry", "common-time", @@ -4916,6 +4917,7 @@ dependencies = [ "serde_json", "snafu", "store-api", + "table", "tempdir", "tokio", "tonic", @@ -4932,6 +4934,7 @@ dependencies = [ "bytes", "common-base", "common-error", + "common-query", "common-time", "datatypes", "derive_builder", @@ -5090,14 +5093,21 @@ dependencies = [ "common-error", "common-query", "common-recordbatch", + "common-telemetry", "datafusion", "datafusion-common", + "datafusion-expr", "datatypes", "derive_builder", "futures", + "parquet-format-async-temp", + "paste", "serde", "snafu", "store-api", + "tempdir", + "tokio", + "tokio-util", ] [[package]] @@ -5447,6 +5457,7 @@ checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/src/query/src/datafusion/plan_adapter.rs b/src/query/src/datafusion/plan_adapter.rs index 69852fb7e7..d71f2b9f6f 100644 --- a/src/query/src/datafusion/plan_adapter.rs +++ b/src/query/src/datafusion/plan_adapter.rs @@ -1,5 +1,5 @@ use std::any::Any; -use std::fmt::{self, Debug}; +use std::fmt::Debug; use std::sync::Arc; use common_recordbatch::SendableRecordBatchStream; @@ -22,6 +22,7 @@ use crate::executor::Runtime; use crate::plan::{Partitioning, PhysicalPlan}; /// Datafusion ExecutionPlan -> greptime PhysicalPlan +#[derive(Debug)] pub struct PhysicalPlanAdapter { plan: Arc, schema: SchemaRef, @@ -109,18 +110,12 @@ impl PhysicalPlan for PhysicalPlanAdapter { } /// Greptime PhysicalPlan -> datafusion ExecutionPlan. +#[derive(Debug)] struct ExecutionPlanAdapter { plan: Arc, schema: SchemaRef, } -impl Debug for ExecutionPlanAdapter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - //TODO(dennis) better debug info - write!(f, "ExecutionPlan(PlaceHolder)") - } -} - #[async_trait::async_trait] impl ExecutionPlan for ExecutionPlanAdapter { fn as_any(&self) -> &dyn Any { @@ -182,3 +177,39 @@ impl ExecutionPlan for ExecutionPlanAdapter { Statistics::default() } } + +#[cfg(test)] +mod tests { + use arrow::datatypes::Field; + use datafusion::physical_plan::empty::EmptyExec; + use datafusion_common::field_util::SchemaExt; + use datatypes::schema::Schema; + + use super::*; + + #[test] + fn test_physical_plan_adapter() { + let arrow_schema = arrow::datatypes::Schema::new(vec![Field::new( + "name", + arrow::datatypes::DataType::Utf8, + true, + )]); + + let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap()); + let physical_plan = PhysicalPlanAdapter::new( + schema.clone(), + Arc::new(EmptyExec::new(true, Arc::new(arrow_schema))), + ); + + assert!(physical_plan + .plan + .as_any() + .downcast_ref::() + .is_some()); + let execution_plan_adapter = ExecutionPlanAdapter { + plan: Arc::new(physical_plan), + schema: schema.clone(), + }; + assert_eq!(schema, execution_plan_adapter.schema); + } +} diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index 9a1b98a945..d26c0cfa73 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::fmt::Debug; use std::sync::Arc; use common_recordbatch::SendableRecordBatchStream; @@ -39,7 +40,7 @@ impl Partitioning { } #[async_trait::async_trait] -pub trait PhysicalPlan: Send + Sync + Any { +pub trait PhysicalPlan: Send + Sync + Any + Debug { /// Get the schema for this execution plan fn schema(&self) -> SchemaRef; diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index bb3e83149c..0fda93c141 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -12,6 +12,7 @@ async-trait = "0.1" bytes = "1.1" common-base = { path = "../common/base" } common-error = { path = "../common/error" } +common-query = { path = "../common/query" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } @@ -29,6 +30,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } +table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } tonic = "0.8" uuid = { version = "1.1", features = ["v4"] } diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 8ca8f23b2e..3a94fb34d5 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -1,8 +1,10 @@ use std::sync::Arc; use async_trait::async_trait; +use common_query::logical_plan::Expr; use snafu::ResultExt; use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber}; +use table::predicate::Predicate; use crate::error::{self, Error, Result}; use crate::memtable::{IterContext, MemtableRef, MemtableSet}; @@ -49,6 +51,7 @@ impl ChunkReaderImpl { pub struct ChunkReaderBuilder { schema: RegionSchemaRef, projection: Option>, + filters: Vec, sst_layer: AccessLayerRef, iter_ctx: IterContext, memtables: Vec, @@ -60,6 +63,7 @@ impl ChunkReaderBuilder { ChunkReaderBuilder { schema, projection: None, + filters: vec![], sst_layer, iter_ctx: IterContext::default(), memtables: Vec::new(), @@ -78,6 +82,11 @@ impl ChunkReaderBuilder { self } + pub fn filters(mut self, filters: Vec) -> Self { + self.filters = filters; + self + } + pub fn batch_size(mut self, batch_size: usize) -> Self { self.iter_ctx.batch_size = batch_size; self @@ -121,6 +130,7 @@ impl ChunkReaderBuilder { let read_opts = ReadOptions { batch_size: self.iter_ctx.batch_size, projected_schema: schema.clone(), + predicate: Predicate::new(self.filters), }; for file in &self.files_to_read { let reader = self diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index ac3f343ff6..110573100f 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -43,6 +43,7 @@ impl Snapshot for SnapshotImpl { ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone()) .reserve_num_memtables(memtable_version.num_memtables()) .projection(request.projection) + .filters(request.filters) .batch_size(ctx.batch_size) .visible_sequence(visible_sequence) .pick_memtables(mutables); diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index a04bc6d642..3d2123bdb6 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use object_store::{util, ObjectStore}; use serde::{Deserialize, Serialize}; +use table::predicate::Predicate; use crate::error::Result; use crate::memtable::BoxedBatchIterator; @@ -170,13 +171,14 @@ pub struct WriteOptions { // TODO(yingwen): [flush] row group size. } -#[derive(Debug)] pub struct ReadOptions { /// Suggested size of each batch. pub batch_size: usize, /// The schema that user expected to read, might not the same as the /// schema of the SST file. pub projected_schema: ProjectedSchemaRef, + + pub predicate: Predicate, } /// SST access layer. @@ -240,6 +242,7 @@ impl AccessLayer for FsAccessLayer { &file_path, self.object_store.clone(), opts.projected_schema.clone(), + opts.predicate.clone(), ); let stream = reader.chunk_stream(opts.batch_size).await?; diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 3dcfce7291..afa3d1f860 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use async_stream::try_stream; use async_trait::async_trait; +use common_telemetry::debug; use datatypes::arrow::array::Array; use datatypes::arrow::chunk::Chunk; use datatypes::arrow::datatypes::{DataType, Field, Schema}; @@ -19,6 +20,7 @@ use futures_util::sink::SinkExt; use futures_util::{Stream, TryStreamExt}; use object_store::{ObjectStore, SeekableReader}; use snafu::ResultExt; +use table::predicate::Predicate; use crate::error::{self, Result}; use crate::memtable::BoxedBatchIterator; @@ -157,6 +159,7 @@ pub struct ParquetReader<'a> { file_path: &'a str, object_store: ObjectStore, projected_schema: ProjectedSchemaRef, + predicate: Predicate, } type ReaderFactoryFuture<'a, R> = @@ -167,11 +170,13 @@ impl<'a> ParquetReader<'a> { file_path: &str, object_store: ObjectStore, projected_schema: ProjectedSchemaRef, + predicate: Predicate, ) -> ParquetReader { ParquetReader { file_path, object_store, projected_schema, + predicate, } } @@ -191,19 +196,31 @@ impl<'a> ParquetReader<'a> { let metadata = read_metadata_async(&mut reader) .await .context(error::ReadParquetSnafu { file: &file_path })?; + let arrow_schema = infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?; + // Now the StoreSchema is only used to validate metadata of the parquet file, but this schema // would be useful once we support altering schema, as this is the actual schema of the SST. - let _store_schema = StoreSchema::try_from(arrow_schema) + let store_schema = StoreSchema::try_from(arrow_schema) .context(error::ConvertStoreSchemaSnafu { file: &file_path })?; + let pruned_row_groups = self + .predicate + .prune_row_groups(store_schema.schema().clone(), &metadata.row_groups); + let projected_fields = self.projected_fields().to_vec(); let chunk_stream = try_stream!({ - for rg in metadata.row_groups { + for (idx, valid) in pruned_row_groups.iter().enumerate() { + if !valid { + debug!("Pruned {} row groups", idx); + continue; + } + + let rg = &metadata.row_groups[idx]; let column_chunks = read_columns_many_async( &reader_factory, - &rg, + rg, projected_fields.clone(), Some(chunk_size), ) diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index b07f45d259..0258a660eb 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -9,6 +9,7 @@ async-trait = "0.1" bytes = "1.1" common-base = { path = "../common/base" } common-error = { path = "../common/error" } +common-query = { path = "../common/query" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } derive_builder = "0.11" diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index e8f727b31b..1309cc4baa 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -1,6 +1,7 @@ use std::time::Duration; use common_error::ext::ErrorExt; +use common_query::logical_plan::Expr; use common_time::RangeMillis; use datatypes::vectors::VectorRef; @@ -34,7 +35,7 @@ pub trait PutOperation: Send { fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>; } -#[derive(Debug, Default)] +#[derive(Default)] pub struct ScanRequest { /// Max sequence number to read, None for latest sequence. /// @@ -43,6 +44,8 @@ pub struct ScanRequest { pub sequence: Option, /// Indices of columns to read, `None` to read all columns. pub projection: Option>, + /// Filters pushed down + pub filters: Vec, } #[derive(Debug)] diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 5712d011dc..d6190d2628 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -24,7 +24,7 @@ use store_api::storage::{ WriteContext, WriteRequest, }; use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult}; -use table::metadata::{TableInfoRef, TableMetaBuilder}; +use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder}; use table::requests::{AlterKind, AlterTableRequest, InsertRequest}; use table::{ metadata::{TableInfo, TableType}, @@ -118,16 +118,17 @@ impl Table for MitoTable { async fn scan( &self, projection: &Option>, - _filters: &[Expr], + filters: &[Expr], _limit: Option, ) -> TableResult { let read_ctx = ReadContext::default(); let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?; let projection = self.transform_projection(&self.region, projection.clone())?; - + let filters = filters.into(); let scan_request = ScanRequest { projection, + filters, ..Default::default() }; let mut reader = snapshot @@ -243,6 +244,10 @@ impl Table for MitoTable { // table cannot be hold. Ok(()) } + + fn supports_filter_pushdown(&self, _filter: &Expr) -> table::error::Result { + Ok(FilterPushDownType::Inexact) + } } fn build_table_schema_with_new_column( diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index c53720e37f..21f1023d3b 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -9,11 +9,20 @@ chrono = { version = "0.4", features = ["serde"] } common-error = { path = "../common/error" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } +common-telemetry = { path = "../common/telemetry" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } derive_builder = "0.11" futures = "0.3" +parquet-format-async-temp = "0.2" +paste = "1.0" serde = "1.0.136" snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } + +[dev-dependencies] +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } +tempdir = "0.3" +tokio = { version = "1.18", features = ["full"] } +tokio-util = { version = "0.7", features = ["compat"] } diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index 248e813a12..f02821500d 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -1,6 +1,7 @@ pub mod engine; pub mod error; pub mod metadata; +pub mod predicate; pub mod requests; pub mod table; diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs new file mode 100644 index 0000000000..69069cb83b --- /dev/null +++ b/src/table/src/predicate.rs @@ -0,0 +1,227 @@ +mod stats; + +use common_query::logical_plan::Expr; +use common_telemetry::{error, warn}; +use datafusion::physical_optimizer::pruning::PruningPredicate; +use datatypes::arrow::io::parquet::read::RowGroupMetaData; +use datatypes::schema::SchemaRef; + +use crate::predicate::stats::RowGroupPruningStatistics; + +#[derive(Default, Clone)] +pub struct Predicate { + exprs: Vec, +} + +impl Predicate { + pub fn new(exprs: Vec) -> Self { + Self { exprs } + } + + pub fn empty() -> Self { + Self { exprs: vec![] } + } + + pub fn prune_row_groups( + &self, + schema: SchemaRef, + row_groups: &[RowGroupMetaData], + ) -> Vec { + let mut res = vec![true; row_groups.len()]; + for expr in &self.exprs { + match PruningPredicate::try_new(expr.df_expr().clone(), schema.arrow_schema().clone()) { + Ok(p) => { + let stat = RowGroupPruningStatistics::new(row_groups, &schema); + match p.prune(&stat) { + Ok(r) => { + for (curr_val, res) in r.into_iter().zip(res.iter_mut()) { + *res &= curr_val + } + } + Err(e) => { + warn!("Failed to prune row groups, error: {:?}", e); + } + } + } + Err(e) => { + error!("Failed to create predicate for expr, error: {:?}", e); + } + } + } + res + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + pub use datafusion::parquet::schema::types::{BasicTypeInfo, PhysicalType}; + use datafusion_common::Column; + use datafusion_expr::Expr; + use datafusion_expr::Literal; + use datafusion_expr::Operator; + use datatypes::arrow::array::{Int32Array, Utf8Array}; + use datatypes::arrow::chunk::Chunk; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; + use datatypes::arrow::io::parquet::read::FileReader; + use datatypes::arrow::io::parquet::write::{ + Compression, Encoding, FileSink, Version, WriteOptions, + }; + use futures::SinkExt; + use tempdir::TempDir; + use tokio_util::compat::TokioAsyncReadCompatExt; + + use super::*; + + async fn gen_test_parquet_file(dir: &TempDir, cnt: usize) -> (String, Arc) { + let path = dir + .path() + .join("test-prune.parquet") + .to_string_lossy() + .to_string(); + + let name_field = Field::new("name", DataType::Utf8, true); + let count_field = Field::new("cnt", DataType::Int32, true); + + let schema = Schema::from(vec![name_field, count_field]); + + // now all physical types use plain encoding, maybe let caller to choose encoding for each type. + let encodings = vec![Encoding::Plain].repeat(schema.fields.len()); + + let writer = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .open(&path) + .await + .unwrap() + .compat(); + + let mut sink = FileSink::try_new( + writer, + schema.clone(), + encodings, + WriteOptions { + write_statistics: true, + compression: Compression::Gzip, + version: Version::V2, + }, + ) + .unwrap(); + + for i in (0..cnt).step_by(10) { + let name_array = Utf8Array::::from( + &(i..(i + 10).min(cnt)) + .map(|i| Some(i.to_string())) + .collect::>(), + ); + let count_array = Int32Array::from( + &(i..(i + 10).min(cnt)) + .map(|i| Some(i as i32)) + .collect::>(), + ); + + sink.send(Chunk::new(vec![ + Arc::new(name_array), + Arc::new(count_array), + ])) + .await + .unwrap(); + } + sink.close().await.unwrap(); + (path, Arc::new(schema)) + } + + async fn assert_prune(array_cnt: usize, predicate: Predicate, expect: Vec) { + let dir = TempDir::new("prune_parquet").unwrap(); + let (path, schema) = gen_test_parquet_file(&dir, array_cnt).await; + let file_reader = + FileReader::try_new(std::fs::File::open(path).unwrap(), None, None, None, None) + .unwrap(); + + let schema = Arc::new(datatypes::schema::Schema::try_from(schema).unwrap()); + + let vec = file_reader.metadata().row_groups.clone(); + let res = predicate.prune_row_groups(schema, &vec); + assert_eq!(expect, res); + } + + fn gen_predicate(max_val: i32, op: Operator) -> Predicate { + Predicate::new(vec![Expr::BinaryExpr { + left: Box::new(Expr::Column(Column::from_name("cnt".to_string()))), + op, + right: Box::new(max_val.lit()), + } + .into()]) + } + + #[tokio::test] + async fn test_prune_empty() { + assert_prune(3, Predicate::empty(), vec![true]).await; + } + + #[tokio::test] + async fn test_prune_all_match() { + let p = gen_predicate(3, Operator::Gt); + assert_prune(2, p, vec![false]).await; + } + + #[tokio::test] + async fn test_prune_gt() { + let p = gen_predicate(29, Operator::Gt); + assert_prune( + 100, + p, + vec![ + false, false, false, true, true, true, true, true, true, true, + ], + ) + .await; + } + + #[tokio::test] + async fn test_prune_eq_expr() { + let p = gen_predicate(30, Operator::Eq); + assert_prune(40, p, vec![false, false, false, true]).await; + } + + #[tokio::test] + async fn test_prune_neq_expr() { + let p = gen_predicate(30, Operator::NotEq); + assert_prune(40, p, vec![true, true, true, true]).await; + } + + #[tokio::test] + async fn test_prune_gteq_expr() { + let p = gen_predicate(29, Operator::GtEq); + assert_prune(40, p, vec![false, false, true, true]).await; + } + + #[tokio::test] + async fn test_prune_lt_expr() { + let p = gen_predicate(30, Operator::Lt); + assert_prune(40, p, vec![true, true, true, false]).await; + } + + #[tokio::test] + async fn test_prune_lteq_expr() { + let p = gen_predicate(30, Operator::LtEq); + assert_prune(40, p, vec![true, true, true, true]).await; + } + + #[tokio::test] + async fn test_prune_between_expr() { + let p = gen_predicate(30, Operator::LtEq); + assert_prune(40, p, vec![true, true, true, true]).await; + } + + #[tokio::test] + async fn test_or() { + // cnt > 30 or cnt < 20 + let e = Expr::Column(Column::from_name("cnt")) + .gt(30.lit()) + .or(Expr::Column(Column::from_name("cnt")).lt(20.lit())); + let p = Predicate::new(vec![e.into()]); + assert_prune(40, p, vec![true, true, false, true]).await; + } +} diff --git a/src/table/src/predicate/stats.rs b/src/table/src/predicate/stats.rs new file mode 100644 index 0000000000..b1e5d0d5a5 --- /dev/null +++ b/src/table/src/predicate/stats.rs @@ -0,0 +1,142 @@ +use datafusion::parquet::metadata::RowGroupMetaData; +use datafusion::parquet::statistics::{ + BinaryStatistics, BooleanStatistics, FixedLenStatistics, PrimitiveStatistics, +}; +use datafusion::physical_optimizer::pruning::PruningStatistics; +use datafusion_common::{Column, ScalarValue}; +use datatypes::arrow::array::ArrayRef; +use datatypes::arrow::datatypes::DataType; +use datatypes::arrow::io::parquet::read::PhysicalType; +use datatypes::prelude::Vector; +use datatypes::vectors::Int64Vector; +use paste::paste; + +pub struct RowGroupPruningStatistics<'a> { + pub meta_data: &'a [RowGroupMetaData], + pub schema: &'a datatypes::schema::SchemaRef, +} + +impl<'a> RowGroupPruningStatistics<'a> { + pub fn new( + meta_data: &'a [RowGroupMetaData], + schema: &'a datatypes::schema::SchemaRef, + ) -> Self { + Self { meta_data, schema } + } + + fn field_by_name(&self, name: &str) -> Option<(usize, &DataType)> { + let idx = self.schema.column_index_by_name(name)?; + let data_type = &self.schema.arrow_schema().fields.get(idx)?.data_type; + Some((idx, data_type)) + } +} + +macro_rules! impl_min_max_values { + ($self:ident, $col:ident, $min_max: ident) => { + paste! { + { + let (column_index, data_type) = $self.field_by_name(&$col.name)?; + let null_scalar: ScalarValue = data_type.try_into().ok()?; + let scalar_values: Vec = $self + .meta_data + .iter() + .flat_map(|meta| meta.column(column_index).statistics()) + .map(|stats| { + let stats = stats.ok()?; + let res = match stats.physical_type() { + PhysicalType::Boolean => { + let $min_max = stats.as_any().downcast_ref::().unwrap().[<$min_max _value>]; + Some(ScalarValue::Boolean($min_max)) + } + PhysicalType::Int32 => { + let $min_max = stats + .as_any() + .downcast_ref::>() + .unwrap() + .[<$min_max _value>]; + Some(ScalarValue::Int32($min_max)) + } + PhysicalType::Int64 => { + let $min_max = stats + .as_any() + .downcast_ref::>() + .unwrap() + .[<$min_max _value>]; + Some(ScalarValue::Int64($min_max)) + } + PhysicalType::Int96 => { + // INT96 currently not supported + None + } + PhysicalType::Float => { + let $min_max = stats + .as_any() + .downcast_ref::>() + .unwrap() + .[<$min_max _value>]; + Some(ScalarValue::Float32($min_max)) + } + PhysicalType::Double => { + let $min_max = stats + .as_any() + .downcast_ref::>() + .unwrap() + .[<$min_max _value>]; + Some(ScalarValue::Float64($min_max)) + } + PhysicalType::ByteArray => { + let $min_max = stats + .as_any() + .downcast_ref::() + .unwrap() + .[<$min_max _value>] + .clone(); + Some(ScalarValue::Binary($min_max)) + } + PhysicalType::FixedLenByteArray(_) => { + let $min_max = stats + .as_any() + .downcast_ref::() + .unwrap() + .[<$min_max _value>] + .clone(); + Some(ScalarValue::Binary($min_max)) + } + }; + + res + }) + .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone())) + .collect::>(); + ScalarValue::iter_to_array(scalar_values).ok() + } + } + }; +} + +impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { + fn min_values(&self, column: &Column) -> Option { + impl_min_max_values!(self, column, min) + } + + fn max_values(&self, column: &Column) -> Option { + impl_min_max_values!(self, column, max) + } + + fn num_containers(&self) -> usize { + self.meta_data.len() + } + + fn null_counts(&self, column: &Column) -> Option { + let (idx, _) = self.field_by_name(&column.name)?; + let mut values: Vec> = Vec::with_capacity(self.meta_data.len()); + for m in self.meta_data { + let col = m.column(idx); + let stat = col.statistics()?.ok()?; + let bs = stat.null_count(); + values.push(bs); + } + + Some(Int64Vector::from(values).to_arrow_array()) + } +} diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 4d4c71b0f1..8f988b80e4 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -1,7 +1,7 @@ +use core::fmt::Formatter; use core::pin::Pin; use core::task::{Context, Poll}; use std::any::Any; -use std::fmt; use std::fmt::Debug; use std::mem; use std::sync::{Arc, Mutex}; @@ -9,6 +9,7 @@ use std::sync::{Arc, Mutex}; use common_query::logical_plan::Expr; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_telemetry::debug; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; /// Datafusion table adpaters use datafusion::datasource::{ @@ -40,9 +41,10 @@ struct ExecutionPlanAdapter { } impl Debug for ExecutionPlanAdapter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - //TODO(dennis) better debug info - write!(f, "ExecutionPlan(PlaceHolder)") + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + f.debug_struct("ExecutionPlanAdapter") + .field("schema", &self.schema) + .finish() } } @@ -202,7 +204,7 @@ impl Table for TableAdapter { limit: Option, ) -> Result { let filters: Vec = filters.iter().map(|e| e.df_expr().clone()).collect(); - + debug!("TableScan filter size: {}", filters.len()); let execution_plan = self .table_provider .scan(projection, &filters, limit)