mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 06:42:57 +00:00
feat: impl filter push down to parquet reader (#262)
* wip add predicate definition * fix value move * implement predicate and prune * impl filter push down in chunk reader * add more expr tests * chore: rebase develop * fix: unit test * fix: field name/index lookup when building pruning stats * chore: add some meaningless test * fix: remove unnecessary extern crate * fix: use datatypes::schema::SchemaRef
This commit is contained in:
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -4897,6 +4897,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-query",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
@@ -4916,6 +4917,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"snafu",
|
||||
"store-api",
|
||||
"table",
|
||||
"tempdir",
|
||||
"tokio",
|
||||
"tonic",
|
||||
@@ -4932,6 +4934,7 @@ dependencies = [
|
||||
"bytes",
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-query",
|
||||
"common-time",
|
||||
"datatypes",
|
||||
"derive_builder",
|
||||
@@ -5090,14 +5093,21 @@ dependencies = [
|
||||
"common-error",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"datatypes",
|
||||
"derive_builder",
|
||||
"futures",
|
||||
"parquet-format-async-temp",
|
||||
"paste",
|
||||
"serde",
|
||||
"snafu",
|
||||
"store-api",
|
||||
"tempdir",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5447,6 +5457,7 @@ checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::any::Any;
|
||||
use std::fmt::{self, Debug};
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
@@ -22,6 +22,7 @@ use crate::executor::Runtime;
|
||||
use crate::plan::{Partitioning, PhysicalPlan};
|
||||
|
||||
/// Datafusion ExecutionPlan -> greptime PhysicalPlan
|
||||
#[derive(Debug)]
|
||||
pub struct PhysicalPlanAdapter {
|
||||
plan: Arc<dyn ExecutionPlan>,
|
||||
schema: SchemaRef,
|
||||
@@ -109,18 +110,12 @@ impl PhysicalPlan for PhysicalPlanAdapter {
|
||||
}
|
||||
|
||||
/// Greptime PhysicalPlan -> datafusion ExecutionPlan.
|
||||
#[derive(Debug)]
|
||||
struct ExecutionPlanAdapter {
|
||||
plan: Arc<dyn PhysicalPlan>,
|
||||
schema: SchemaRef,
|
||||
}
|
||||
|
||||
impl Debug for ExecutionPlanAdapter {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
//TODO(dennis) better debug info
|
||||
write!(f, "ExecutionPlan(PlaceHolder)")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ExecutionPlan for ExecutionPlanAdapter {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
@@ -182,3 +177,39 @@ impl ExecutionPlan for ExecutionPlanAdapter {
|
||||
Statistics::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::datatypes::Field;
|
||||
use datafusion::physical_plan::empty::EmptyExec;
|
||||
use datafusion_common::field_util::SchemaExt;
|
||||
use datatypes::schema::Schema;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_physical_plan_adapter() {
|
||||
let arrow_schema = arrow::datatypes::Schema::new(vec![Field::new(
|
||||
"name",
|
||||
arrow::datatypes::DataType::Utf8,
|
||||
true,
|
||||
)]);
|
||||
|
||||
let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap());
|
||||
let physical_plan = PhysicalPlanAdapter::new(
|
||||
schema.clone(),
|
||||
Arc::new(EmptyExec::new(true, Arc::new(arrow_schema))),
|
||||
);
|
||||
|
||||
assert!(physical_plan
|
||||
.plan
|
||||
.as_any()
|
||||
.downcast_ref::<EmptyExec>()
|
||||
.is_some());
|
||||
let execution_plan_adapter = ExecutionPlanAdapter {
|
||||
plan: Arc::new(physical_plan),
|
||||
schema: schema.clone(),
|
||||
};
|
||||
assert_eq!(schema, execution_plan_adapter.schema);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
@@ -39,7 +40,7 @@ impl Partitioning {
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait PhysicalPlan: Send + Sync + Any {
|
||||
pub trait PhysicalPlan: Send + Sync + Any + Debug {
|
||||
/// Get the schema for this execution plan
|
||||
fn schema(&self) -> SchemaRef;
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ async-trait = "0.1"
|
||||
bytes = "1.1"
|
||||
common-base = { path = "../common/base" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-query = { path = "../common/query" }
|
||||
common-runtime = { path = "../common/runtime" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
common-time = { path = "../common/time" }
|
||||
@@ -29,6 +30,7 @@ serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
store-api = { path = "../store-api" }
|
||||
table = { path = "../table" }
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
tonic = "0.8"
|
||||
uuid = { version = "1.1", features = ["v4"] }
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_query::logical_plan::Expr;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{Chunk, ChunkReader, SchemaRef, SequenceNumber};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::memtable::{IterContext, MemtableRef, MemtableSet};
|
||||
@@ -49,6 +51,7 @@ impl ChunkReaderImpl {
|
||||
pub struct ChunkReaderBuilder {
|
||||
schema: RegionSchemaRef,
|
||||
projection: Option<Vec<usize>>,
|
||||
filters: Vec<Expr>,
|
||||
sst_layer: AccessLayerRef,
|
||||
iter_ctx: IterContext,
|
||||
memtables: Vec<MemtableRef>,
|
||||
@@ -60,6 +63,7 @@ impl ChunkReaderBuilder {
|
||||
ChunkReaderBuilder {
|
||||
schema,
|
||||
projection: None,
|
||||
filters: vec![],
|
||||
sst_layer,
|
||||
iter_ctx: IterContext::default(),
|
||||
memtables: Vec::new(),
|
||||
@@ -78,6 +82,11 @@ impl ChunkReaderBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn filters(mut self, filters: Vec<Expr>) -> Self {
|
||||
self.filters = filters;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn batch_size(mut self, batch_size: usize) -> Self {
|
||||
self.iter_ctx.batch_size = batch_size;
|
||||
self
|
||||
@@ -121,6 +130,7 @@ impl ChunkReaderBuilder {
|
||||
let read_opts = ReadOptions {
|
||||
batch_size: self.iter_ctx.batch_size,
|
||||
projected_schema: schema.clone(),
|
||||
predicate: Predicate::new(self.filters),
|
||||
};
|
||||
for file in &self.files_to_read {
|
||||
let reader = self
|
||||
|
||||
@@ -43,6 +43,7 @@ impl Snapshot for SnapshotImpl {
|
||||
ChunkReaderBuilder::new(self.version.schema().clone(), self.sst_layer.clone())
|
||||
.reserve_num_memtables(memtable_version.num_memtables())
|
||||
.projection(request.projection)
|
||||
.filters(request.filters)
|
||||
.batch_size(ctx.batch_size)
|
||||
.visible_sequence(visible_sequence)
|
||||
.pick_memtables(mutables);
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use object_store::{util, ObjectStore};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
@@ -170,13 +171,14 @@ pub struct WriteOptions {
|
||||
// TODO(yingwen): [flush] row group size.
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ReadOptions {
|
||||
/// Suggested size of each batch.
|
||||
pub batch_size: usize,
|
||||
/// The schema that user expected to read, might not the same as the
|
||||
/// schema of the SST file.
|
||||
pub projected_schema: ProjectedSchemaRef,
|
||||
|
||||
pub predicate: Predicate,
|
||||
}
|
||||
|
||||
/// SST access layer.
|
||||
@@ -240,6 +242,7 @@ impl AccessLayer for FsAccessLayer {
|
||||
&file_path,
|
||||
self.object_store.clone(),
|
||||
opts.projected_schema.clone(),
|
||||
opts.predicate.clone(),
|
||||
);
|
||||
|
||||
let stream = reader.chunk_stream(opts.batch_size).await?;
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::debug;
|
||||
use datatypes::arrow::array::Array;
|
||||
use datatypes::arrow::chunk::Chunk;
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema};
|
||||
@@ -19,6 +20,7 @@ use futures_util::sink::SinkExt;
|
||||
use futures_util::{Stream, TryStreamExt};
|
||||
use object_store::{ObjectStore, SeekableReader};
|
||||
use snafu::ResultExt;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
@@ -157,6 +159,7 @@ pub struct ParquetReader<'a> {
|
||||
file_path: &'a str,
|
||||
object_store: ObjectStore,
|
||||
projected_schema: ProjectedSchemaRef,
|
||||
predicate: Predicate,
|
||||
}
|
||||
|
||||
type ReaderFactoryFuture<'a, R> =
|
||||
@@ -167,11 +170,13 @@ impl<'a> ParquetReader<'a> {
|
||||
file_path: &str,
|
||||
object_store: ObjectStore,
|
||||
projected_schema: ProjectedSchemaRef,
|
||||
predicate: Predicate,
|
||||
) -> ParquetReader {
|
||||
ParquetReader {
|
||||
file_path,
|
||||
object_store,
|
||||
projected_schema,
|
||||
predicate,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,19 +196,31 @@ impl<'a> ParquetReader<'a> {
|
||||
let metadata = read_metadata_async(&mut reader)
|
||||
.await
|
||||
.context(error::ReadParquetSnafu { file: &file_path })?;
|
||||
|
||||
let arrow_schema =
|
||||
infer_schema(&metadata).context(error::ReadParquetSnafu { file: &file_path })?;
|
||||
|
||||
// Now the StoreSchema is only used to validate metadata of the parquet file, but this schema
|
||||
// would be useful once we support altering schema, as this is the actual schema of the SST.
|
||||
let _store_schema = StoreSchema::try_from(arrow_schema)
|
||||
let store_schema = StoreSchema::try_from(arrow_schema)
|
||||
.context(error::ConvertStoreSchemaSnafu { file: &file_path })?;
|
||||
|
||||
let pruned_row_groups = self
|
||||
.predicate
|
||||
.prune_row_groups(store_schema.schema().clone(), &metadata.row_groups);
|
||||
|
||||
let projected_fields = self.projected_fields().to_vec();
|
||||
let chunk_stream = try_stream!({
|
||||
for rg in metadata.row_groups {
|
||||
for (idx, valid) in pruned_row_groups.iter().enumerate() {
|
||||
if !valid {
|
||||
debug!("Pruned {} row groups", idx);
|
||||
continue;
|
||||
}
|
||||
|
||||
let rg = &metadata.row_groups[idx];
|
||||
let column_chunks = read_columns_many_async(
|
||||
&reader_factory,
|
||||
&rg,
|
||||
rg,
|
||||
projected_fields.clone(),
|
||||
Some(chunk_size),
|
||||
)
|
||||
|
||||
@@ -9,6 +9,7 @@ async-trait = "0.1"
|
||||
bytes = "1.1"
|
||||
common-base = { path = "../common/base" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-query = { path = "../common/query" }
|
||||
common-time = { path = "../common/time" }
|
||||
datatypes = { path = "../datatypes" }
|
||||
derive_builder = "0.11"
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_time::RangeMillis;
|
||||
use datatypes::vectors::VectorRef;
|
||||
|
||||
@@ -34,7 +35,7 @@ pub trait PutOperation: Send {
|
||||
fn add_value_column(&mut self, name: &str, vector: VectorRef) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Default)]
|
||||
pub struct ScanRequest {
|
||||
/// Max sequence number to read, None for latest sequence.
|
||||
///
|
||||
@@ -43,6 +44,8 @@ pub struct ScanRequest {
|
||||
pub sequence: Option<SequenceNumber>,
|
||||
/// Indices of columns to read, `None` to read all columns.
|
||||
pub projection: Option<Vec<usize>>,
|
||||
/// Filters pushed down
|
||||
pub filters: Vec<Expr>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -24,7 +24,7 @@ use store_api::storage::{
|
||||
WriteContext, WriteRequest,
|
||||
};
|
||||
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
|
||||
use table::metadata::{TableInfoRef, TableMetaBuilder};
|
||||
use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder};
|
||||
use table::requests::{AlterKind, AlterTableRequest, InsertRequest};
|
||||
use table::{
|
||||
metadata::{TableInfo, TableType},
|
||||
@@ -118,16 +118,17 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
async fn scan(
|
||||
&self,
|
||||
projection: &Option<Vec<usize>>,
|
||||
_filters: &[Expr],
|
||||
filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
) -> TableResult<SendableRecordBatchStream> {
|
||||
let read_ctx = ReadContext::default();
|
||||
let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?;
|
||||
|
||||
let projection = self.transform_projection(&self.region, projection.clone())?;
|
||||
|
||||
let filters = filters.into();
|
||||
let scan_request = ScanRequest {
|
||||
projection,
|
||||
filters,
|
||||
..Default::default()
|
||||
};
|
||||
let mut reader = snapshot
|
||||
@@ -243,6 +244,10 @@ impl<R: Region> Table for MitoTable<R> {
|
||||
// table cannot be hold.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn supports_filter_pushdown(&self, _filter: &Expr) -> table::error::Result<FilterPushDownType> {
|
||||
Ok(FilterPushDownType::Inexact)
|
||||
}
|
||||
}
|
||||
|
||||
fn build_table_schema_with_new_column(
|
||||
|
||||
@@ -9,11 +9,20 @@ chrono = { version = "0.4", features = ["serde"] }
|
||||
common-error = { path = "../common/error" }
|
||||
common-query = { path = "../common/query" }
|
||||
common-recordbatch = { path = "../common/recordbatch" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] }
|
||||
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
|
||||
datatypes = { path = "../datatypes" }
|
||||
derive_builder = "0.11"
|
||||
futures = "0.3"
|
||||
parquet-format-async-temp = "0.2"
|
||||
paste = "1.0"
|
||||
serde = "1.0.136"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
store-api = { path = "../store-api" }
|
||||
|
||||
[dev-dependencies]
|
||||
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
|
||||
tempdir = "0.3"
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
tokio-util = { version = "0.7", features = ["compat"] }
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod engine;
|
||||
pub mod error;
|
||||
pub mod metadata;
|
||||
pub mod predicate;
|
||||
pub mod requests;
|
||||
pub mod table;
|
||||
|
||||
|
||||
227
src/table/src/predicate.rs
Normal file
227
src/table/src/predicate.rs
Normal file
@@ -0,0 +1,227 @@
|
||||
mod stats;
|
||||
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_telemetry::{error, warn};
|
||||
use datafusion::physical_optimizer::pruning::PruningPredicate;
|
||||
use datatypes::arrow::io::parquet::read::RowGroupMetaData;
|
||||
use datatypes::schema::SchemaRef;
|
||||
|
||||
use crate::predicate::stats::RowGroupPruningStatistics;
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Predicate {
|
||||
exprs: Vec<Expr>,
|
||||
}
|
||||
|
||||
impl Predicate {
|
||||
pub fn new(exprs: Vec<Expr>) -> Self {
|
||||
Self { exprs }
|
||||
}
|
||||
|
||||
pub fn empty() -> Self {
|
||||
Self { exprs: vec![] }
|
||||
}
|
||||
|
||||
pub fn prune_row_groups(
|
||||
&self,
|
||||
schema: SchemaRef,
|
||||
row_groups: &[RowGroupMetaData],
|
||||
) -> Vec<bool> {
|
||||
let mut res = vec![true; row_groups.len()];
|
||||
for expr in &self.exprs {
|
||||
match PruningPredicate::try_new(expr.df_expr().clone(), schema.arrow_schema().clone()) {
|
||||
Ok(p) => {
|
||||
let stat = RowGroupPruningStatistics::new(row_groups, &schema);
|
||||
match p.prune(&stat) {
|
||||
Ok(r) => {
|
||||
for (curr_val, res) in r.into_iter().zip(res.iter_mut()) {
|
||||
*res &= curr_val
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to prune row groups, error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to create predicate for expr, error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use datafusion::parquet::schema::types::{BasicTypeInfo, PhysicalType};
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::Expr;
|
||||
use datafusion_expr::Literal;
|
||||
use datafusion_expr::Operator;
|
||||
use datatypes::arrow::array::{Int32Array, Utf8Array};
|
||||
use datatypes::arrow::chunk::Chunk;
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema};
|
||||
use datatypes::arrow::io::parquet::read::FileReader;
|
||||
use datatypes::arrow::io::parquet::write::{
|
||||
Compression, Encoding, FileSink, Version, WriteOptions,
|
||||
};
|
||||
use futures::SinkExt;
|
||||
use tempdir::TempDir;
|
||||
use tokio_util::compat::TokioAsyncReadCompatExt;
|
||||
|
||||
use super::*;
|
||||
|
||||
async fn gen_test_parquet_file(dir: &TempDir, cnt: usize) -> (String, Arc<Schema>) {
|
||||
let path = dir
|
||||
.path()
|
||||
.join("test-prune.parquet")
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
|
||||
let name_field = Field::new("name", DataType::Utf8, true);
|
||||
let count_field = Field::new("cnt", DataType::Int32, true);
|
||||
|
||||
let schema = Schema::from(vec![name_field, count_field]);
|
||||
|
||||
// now all physical types use plain encoding, maybe let caller to choose encoding for each type.
|
||||
let encodings = vec![Encoding::Plain].repeat(schema.fields.len());
|
||||
|
||||
let writer = tokio::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&path)
|
||||
.await
|
||||
.unwrap()
|
||||
.compat();
|
||||
|
||||
let mut sink = FileSink::try_new(
|
||||
writer,
|
||||
schema.clone(),
|
||||
encodings,
|
||||
WriteOptions {
|
||||
write_statistics: true,
|
||||
compression: Compression::Gzip,
|
||||
version: Version::V2,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
for i in (0..cnt).step_by(10) {
|
||||
let name_array = Utf8Array::<i64>::from(
|
||||
&(i..(i + 10).min(cnt))
|
||||
.map(|i| Some(i.to_string()))
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
let count_array = Int32Array::from(
|
||||
&(i..(i + 10).min(cnt))
|
||||
.map(|i| Some(i as i32))
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
sink.send(Chunk::new(vec![
|
||||
Arc::new(name_array),
|
||||
Arc::new(count_array),
|
||||
]))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
sink.close().await.unwrap();
|
||||
(path, Arc::new(schema))
|
||||
}
|
||||
|
||||
async fn assert_prune(array_cnt: usize, predicate: Predicate, expect: Vec<bool>) {
|
||||
let dir = TempDir::new("prune_parquet").unwrap();
|
||||
let (path, schema) = gen_test_parquet_file(&dir, array_cnt).await;
|
||||
let file_reader =
|
||||
FileReader::try_new(std::fs::File::open(path).unwrap(), None, None, None, None)
|
||||
.unwrap();
|
||||
|
||||
let schema = Arc::new(datatypes::schema::Schema::try_from(schema).unwrap());
|
||||
|
||||
let vec = file_reader.metadata().row_groups.clone();
|
||||
let res = predicate.prune_row_groups(schema, &vec);
|
||||
assert_eq!(expect, res);
|
||||
}
|
||||
|
||||
fn gen_predicate(max_val: i32, op: Operator) -> Predicate {
|
||||
Predicate::new(vec![Expr::BinaryExpr {
|
||||
left: Box::new(Expr::Column(Column::from_name("cnt".to_string()))),
|
||||
op,
|
||||
right: Box::new(max_val.lit()),
|
||||
}
|
||||
.into()])
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_empty() {
|
||||
assert_prune(3, Predicate::empty(), vec![true]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_all_match() {
|
||||
let p = gen_predicate(3, Operator::Gt);
|
||||
assert_prune(2, p, vec![false]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_gt() {
|
||||
let p = gen_predicate(29, Operator::Gt);
|
||||
assert_prune(
|
||||
100,
|
||||
p,
|
||||
vec![
|
||||
false, false, false, true, true, true, true, true, true, true,
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_eq_expr() {
|
||||
let p = gen_predicate(30, Operator::Eq);
|
||||
assert_prune(40, p, vec![false, false, false, true]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_neq_expr() {
|
||||
let p = gen_predicate(30, Operator::NotEq);
|
||||
assert_prune(40, p, vec![true, true, true, true]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_gteq_expr() {
|
||||
let p = gen_predicate(29, Operator::GtEq);
|
||||
assert_prune(40, p, vec![false, false, true, true]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_lt_expr() {
|
||||
let p = gen_predicate(30, Operator::Lt);
|
||||
assert_prune(40, p, vec![true, true, true, false]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_lteq_expr() {
|
||||
let p = gen_predicate(30, Operator::LtEq);
|
||||
assert_prune(40, p, vec![true, true, true, true]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_between_expr() {
|
||||
let p = gen_predicate(30, Operator::LtEq);
|
||||
assert_prune(40, p, vec![true, true, true, true]).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_or() {
|
||||
// cnt > 30 or cnt < 20
|
||||
let e = Expr::Column(Column::from_name("cnt"))
|
||||
.gt(30.lit())
|
||||
.or(Expr::Column(Column::from_name("cnt")).lt(20.lit()));
|
||||
let p = Predicate::new(vec![e.into()]);
|
||||
assert_prune(40, p, vec![true, true, false, true]).await;
|
||||
}
|
||||
}
|
||||
142
src/table/src/predicate/stats.rs
Normal file
142
src/table/src/predicate/stats.rs
Normal file
@@ -0,0 +1,142 @@
|
||||
use datafusion::parquet::metadata::RowGroupMetaData;
|
||||
use datafusion::parquet::statistics::{
|
||||
BinaryStatistics, BooleanStatistics, FixedLenStatistics, PrimitiveStatistics,
|
||||
};
|
||||
use datafusion::physical_optimizer::pruning::PruningStatistics;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datatypes::arrow::array::ArrayRef;
|
||||
use datatypes::arrow::datatypes::DataType;
|
||||
use datatypes::arrow::io::parquet::read::PhysicalType;
|
||||
use datatypes::prelude::Vector;
|
||||
use datatypes::vectors::Int64Vector;
|
||||
use paste::paste;
|
||||
|
||||
pub struct RowGroupPruningStatistics<'a> {
|
||||
pub meta_data: &'a [RowGroupMetaData],
|
||||
pub schema: &'a datatypes::schema::SchemaRef,
|
||||
}
|
||||
|
||||
impl<'a> RowGroupPruningStatistics<'a> {
|
||||
pub fn new(
|
||||
meta_data: &'a [RowGroupMetaData],
|
||||
schema: &'a datatypes::schema::SchemaRef,
|
||||
) -> Self {
|
||||
Self { meta_data, schema }
|
||||
}
|
||||
|
||||
fn field_by_name(&self, name: &str) -> Option<(usize, &DataType)> {
|
||||
let idx = self.schema.column_index_by_name(name)?;
|
||||
let data_type = &self.schema.arrow_schema().fields.get(idx)?.data_type;
|
||||
Some((idx, data_type))
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_min_max_values {
|
||||
($self:ident, $col:ident, $min_max: ident) => {
|
||||
paste! {
|
||||
{
|
||||
let (column_index, data_type) = $self.field_by_name(&$col.name)?;
|
||||
let null_scalar: ScalarValue = data_type.try_into().ok()?;
|
||||
let scalar_values: Vec<ScalarValue> = $self
|
||||
.meta_data
|
||||
.iter()
|
||||
.flat_map(|meta| meta.column(column_index).statistics())
|
||||
.map(|stats| {
|
||||
let stats = stats.ok()?;
|
||||
let res = match stats.physical_type() {
|
||||
PhysicalType::Boolean => {
|
||||
let $min_max = stats.as_any().downcast_ref::<BooleanStatistics>().unwrap().[<$min_max _value>];
|
||||
Some(ScalarValue::Boolean($min_max))
|
||||
}
|
||||
PhysicalType::Int32 => {
|
||||
let $min_max = stats
|
||||
.as_any()
|
||||
.downcast_ref::<PrimitiveStatistics<i32>>()
|
||||
.unwrap()
|
||||
.[<$min_max _value>];
|
||||
Some(ScalarValue::Int32($min_max))
|
||||
}
|
||||
PhysicalType::Int64 => {
|
||||
let $min_max = stats
|
||||
.as_any()
|
||||
.downcast_ref::<PrimitiveStatistics<i64>>()
|
||||
.unwrap()
|
||||
.[<$min_max _value>];
|
||||
Some(ScalarValue::Int64($min_max))
|
||||
}
|
||||
PhysicalType::Int96 => {
|
||||
// INT96 currently not supported
|
||||
None
|
||||
}
|
||||
PhysicalType::Float => {
|
||||
let $min_max = stats
|
||||
.as_any()
|
||||
.downcast_ref::<PrimitiveStatistics<f32>>()
|
||||
.unwrap()
|
||||
.[<$min_max _value>];
|
||||
Some(ScalarValue::Float32($min_max))
|
||||
}
|
||||
PhysicalType::Double => {
|
||||
let $min_max = stats
|
||||
.as_any()
|
||||
.downcast_ref::<PrimitiveStatistics<f64>>()
|
||||
.unwrap()
|
||||
.[<$min_max _value>];
|
||||
Some(ScalarValue::Float64($min_max))
|
||||
}
|
||||
PhysicalType::ByteArray => {
|
||||
let $min_max = stats
|
||||
.as_any()
|
||||
.downcast_ref::<BinaryStatistics>()
|
||||
.unwrap()
|
||||
.[<$min_max _value>]
|
||||
.clone();
|
||||
Some(ScalarValue::Binary($min_max))
|
||||
}
|
||||
PhysicalType::FixedLenByteArray(_) => {
|
||||
let $min_max = stats
|
||||
.as_any()
|
||||
.downcast_ref::<FixedLenStatistics>()
|
||||
.unwrap()
|
||||
.[<$min_max _value>]
|
||||
.clone();
|
||||
Some(ScalarValue::Binary($min_max))
|
||||
}
|
||||
};
|
||||
|
||||
res
|
||||
})
|
||||
.map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
|
||||
.collect::<Vec<_>>();
|
||||
ScalarValue::iter_to_array(scalar_values).ok()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> {
|
||||
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
|
||||
impl_min_max_values!(self, column, min)
|
||||
}
|
||||
|
||||
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
|
||||
impl_min_max_values!(self, column, max)
|
||||
}
|
||||
|
||||
fn num_containers(&self) -> usize {
|
||||
self.meta_data.len()
|
||||
}
|
||||
|
||||
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
|
||||
let (idx, _) = self.field_by_name(&column.name)?;
|
||||
let mut values: Vec<Option<i64>> = Vec::with_capacity(self.meta_data.len());
|
||||
for m in self.meta_data {
|
||||
let col = m.column(idx);
|
||||
let stat = col.statistics()?.ok()?;
|
||||
let bs = stat.null_count();
|
||||
values.push(bs);
|
||||
}
|
||||
|
||||
Some(Int64Vector::from(values).to_arrow_array())
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
use core::fmt::Formatter;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
use std::any::Any;
|
||||
use std::fmt;
|
||||
use std::fmt::Debug;
|
||||
use std::mem;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -9,6 +9,7 @@ use std::sync::{Arc, Mutex};
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_recordbatch::error::Result as RecordBatchResult;
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use common_telemetry::debug;
|
||||
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
|
||||
/// Datafusion table adpaters
|
||||
use datafusion::datasource::{
|
||||
@@ -40,9 +41,10 @@ struct ExecutionPlanAdapter {
|
||||
}
|
||||
|
||||
impl Debug for ExecutionPlanAdapter {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
//TODO(dennis) better debug info
|
||||
write!(f, "ExecutionPlan(PlaceHolder)")
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
|
||||
f.debug_struct("ExecutionPlanAdapter")
|
||||
.field("schema", &self.schema)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,7 +204,7 @@ impl Table for TableAdapter {
|
||||
limit: Option<usize>,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
let filters: Vec<DfExpr> = filters.iter().map(|e| e.df_expr().clone()).collect();
|
||||
|
||||
debug!("TableScan filter size: {}", filters.len());
|
||||
let execution_plan = self
|
||||
.table_provider
|
||||
.scan(projection, &filters, limit)
|
||||
|
||||
Reference in New Issue
Block a user