diff --git a/Cargo.lock b/Cargo.lock index b768070c61..6039d477e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1892,6 +1892,7 @@ dependencies = [ "arrow2", "async-stream", "async-trait", + "catalog", "client", "common-base", "common-error", @@ -1907,8 +1908,10 @@ dependencies = [ "datanode", "datatypes", "futures", + "itertools", "openmetrics-parser", "prost 0.11.0", + "query", "serde", "servers", "snafu", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a1ace3bde9..7f5b08dc81 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" api = { path = "../api" } async-stream = "0.3" async-trait = "0.1" +catalog = { path = "../catalog" } client = { path = "../client" } common-base = { path = "../common/base" } common-error = { path = "../common/error" } @@ -16,10 +17,14 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } +datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } openmetrics-parser = "0.4" prost = "0.11" +itertools = "0.10" +query = { path = "../query" } serde = "1.0" servers = { path = "../servers" } snafu = { version = "0.7", features = ["backtraces"] } @@ -34,8 +39,6 @@ version = "0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] [dev-dependencies] -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } -datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datanode = { path = "../datanode" } futures = "0.3" tempdir = "0.3" diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 34c4a3a7f2..dd2656e899 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -1,6 +1,11 @@ use std::any::Any; use common_error::prelude::*; +use common_query::logical_plan::Expr; +use datafusion_common::ScalarValue; +use store_api::storage::RegionId; + +use crate::mock::DatanodeId; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -83,6 +88,17 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display( + "Failed to convert DataFusion's ScalarValue: {:?}, source: {}", + value, + source + ))] + ConvertScalarValue { + value: ScalarValue, + #[snafu(backtrace)] + source: datatypes::error::Error, + }, + #[snafu(display("Failed to find partition column: {}", column_name))] FindPartitionColumn { column_name: String, @@ -95,6 +111,24 @@ pub enum Error { backtrace: Backtrace, }, + #[snafu(display("Failed to find regions by filters: {:?}", filters))] + FindRegions { + filters: Vec, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to find Datanode by region: {:?}", region))] + FindDatanode { + region: RegionId, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to get Datanode instance: {:?}", datanode))] + DatanodeInstance { + datanode: DatanodeId, + backtrace: Backtrace, + }, + #[snafu(display("Invaild InsertRequest, reason: {}", reason))] InvalidInsertRequest { reason: String, @@ -118,15 +152,26 @@ impl ErrorExt for Error { | Error::ParseAddr { .. } | Error::InvalidSql { .. } | Error::FindRegion { .. } + | Error::FindRegions { .. } | Error::InvalidInsertRequest { .. } | Error::FindPartitionColumn { .. } | Error::RegionKeysSize { .. } => StatusCode::InvalidArguments, + Error::RuntimeResource { source, .. } => source.status_code(), + Error::StartServer { source, .. } => source.status_code(), + Error::ParseSql { source } => source.status_code(), - Error::ConvertColumnDefaultConstraint { source, .. } => source.status_code(), + + Error::ConvertColumnDefaultConstraint { source, .. } + | Error::ConvertScalarValue { source, .. } => source.status_code(), + Error::RequestDatanode { source } => source.status_code(), - Error::ColumnDataType { .. } => StatusCode::Internal, + + Error::ColumnDataType { .. } + | Error::FindDatanode { .. } + | Error::DatanodeInstance { .. } => StatusCode::Internal, + Error::IllegalFrontendState { .. } | Error::IncompleteGrpcResult { .. } => { StatusCode::Unexpected } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 63bae536a8..0bc53567ea 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -5,6 +5,7 @@ pub mod frontend; pub mod grpc; pub mod influxdb; pub mod instance; +pub(crate) mod mock; pub mod mysql; pub mod opentsdb; pub mod partitioning; @@ -12,5 +13,6 @@ pub mod postgres; pub mod prometheus; mod server; pub mod spliter; +mod table; #[cfg(test)] mod tests; diff --git a/src/frontend/src/mock.rs b/src/frontend/src/mock.rs new file mode 100644 index 0000000000..bd4299b548 --- /dev/null +++ b/src/frontend/src/mock.rs @@ -0,0 +1,169 @@ +// FIXME(LFC): no mock + +use std::fmt::Formatter; +use std::sync::Arc; + +use catalog::CatalogManagerRef; +use client::{Database, Select}; +use common_query::prelude::Expr; +use common_query::Output; +use common_recordbatch::util; +use common_recordbatch::RecordBatches; +use datafusion::logical_plan::{LogicalPlan as DfLogicPlan, LogicalPlanBuilder}; +use datafusion_expr::Expr as DfExpr; +use datatypes::prelude::Value; +use datatypes::schema::SchemaRef; +use query::plan::LogicalPlan; +use table::table::adapter::DfTableProviderAdapter; + +pub(crate) type DatanodeId = u64; + +#[derive(Clone)] +pub(crate) struct DatanodeInstance { + pub(crate) datanode_id: DatanodeId, + catalog_manager: CatalogManagerRef, + db: Database, +} + +impl std::fmt::Debug for DatanodeInstance { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("DatanodeInstance") + } +} + +impl DatanodeInstance { + #[allow(dead_code)] + pub(crate) fn new( + datanode_id: DatanodeId, + catalog_manager: CatalogManagerRef, + db: Database, + ) -> Self { + Self { + datanode_id, + catalog_manager, + db, + } + } + + #[allow(clippy::print_stdout)] + pub(crate) async fn grpc_table_scan(&self, plan: TableScanPlan) -> RecordBatches { + let logical_plan = self.build_logical_plan(&plan); + + // TODO(LFC): Directly pass in logical plan to GRPC interface when our substrait codec supports filter. + let sql = to_sql(logical_plan); + println!("executing sql \"{}\" in datanode {}", sql, self.datanode_id); + let result = self.db.select(Select::Sql(sql)).await.unwrap(); + + let output: Output = result.try_into().unwrap(); + let recordbatches = match output { + Output::Stream(stream) => util::collect(stream).await.unwrap(), + Output::RecordBatches(x) => x.take(), + _ => unreachable!(), + }; + + let schema = recordbatches.first().unwrap().schema.clone(); + RecordBatches::try_new(schema, recordbatches).unwrap() + } + + fn build_logical_plan(&self, table_scan: &TableScanPlan) -> LogicalPlan { + let catalog = self.catalog_manager.catalog("greptime").unwrap().unwrap(); + let schema = catalog.schema("public").unwrap().unwrap(); + let table = schema.table(&table_scan.table_name).unwrap().unwrap(); + let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); + + let mut builder = LogicalPlanBuilder::scan_with_filters( + table_scan.table_name.clone(), + table_provider, + table_scan.projection.clone(), + table_scan + .filters + .iter() + .map(|x| x.df_expr().clone()) + .collect::>(), + ) + .unwrap(); + if let Some(limit) = table_scan.limit { + builder = builder.limit(limit).unwrap(); + } + + let plan = builder.build().unwrap(); + LogicalPlan::DfPlan(plan) + } +} + +#[derive(Debug)] +pub(crate) struct TableScanPlan { + pub table_name: String, + pub projection: Option>, + pub filters: Vec, + pub limit: Option, +} + +fn to_sql(plan: LogicalPlan) -> String { + let LogicalPlan::DfPlan(plan) = plan; + let table_scan = match plan { + DfLogicPlan::TableScan(table_scan) => table_scan, + _ => unreachable!("unknown plan: {:?}", plan), + }; + + let schema: SchemaRef = Arc::new(table_scan.source.schema().try_into().unwrap()); + let projection = table_scan + .projection + .map(|x| { + x.iter() + .map(|i| schema.column_name_by_index(*i).to_string()) + .collect::>() + }) + .unwrap_or_else(|| { + schema + .column_schemas() + .iter() + .map(|x| x.name.clone()) + .collect::>() + }) + .join(", "); + + let mut sql = format!("select {} from {}", projection, &table_scan.table_name); + + let filters = table_scan + .filters + .iter() + .map(expr_to_sql) + .collect::>() + .join(" AND "); + if !filters.is_empty() { + sql.push_str(" where "); + sql.push_str(&filters); + } + + if let Some(limit) = table_scan.limit { + sql.push_str(" limit "); + sql.push_str(&limit.to_string()); + } + sql +} + +fn expr_to_sql(expr: &DfExpr) -> String { + match expr { + DfExpr::BinaryExpr { + ref left, + ref right, + ref op, + } => format!( + "{} {} {}", + expr_to_sql(left.as_ref()), + op, + expr_to_sql(right.as_ref()) + ), + DfExpr::Column(c) => c.name.clone(), + DfExpr::Literal(sv) => { + let v: Value = Value::try_from(sv.clone()).unwrap(); + if v.data_type().is_string() { + format!("'{}'", sv) + } else { + format!("{}", sv) + } + } + _ => unimplemented!("not implemented for {:?}", expr), + } +} diff --git a/src/frontend/src/partitioning.rs b/src/frontend/src/partitioning.rs index 678705e379..db0432f479 100644 --- a/src/frontend/src/partitioning.rs +++ b/src/frontend/src/partitioning.rs @@ -1,13 +1,16 @@ mod columns; -mod range; +pub(crate) mod range; use std::fmt::Debug; +use std::sync::Arc; pub use datafusion_expr::Operator; use datatypes::prelude::Value; use store_api::storage::RegionId; -pub trait PartitionRule { +pub(crate) type PartitionRuleRef = Arc>; + +pub trait PartitionRule: Sync + Send { type Error: Debug; fn partition_columns(&self) -> Vec; @@ -36,6 +39,14 @@ pub struct PartitionExpr { } impl PartitionExpr { + pub(crate) fn new(column: impl Into, op: Operator, value: Value) -> Self { + Self { + column: column.into(), + op, + value, + } + } + pub fn value(&self) -> &Value { &self.value } diff --git a/src/frontend/src/partitioning/columns.rs b/src/frontend/src/partitioning/columns.rs index 542e26cf83..2ebc78b829 100644 --- a/src/frontend/src/partitioning/columns.rs +++ b/src/frontend/src/partitioning/columns.rs @@ -67,6 +67,19 @@ impl RangeColumnsPartitionRule { value_lists: Vec>, regions: Vec, ) -> Self { + // An example range columns partition rule to calculate the first column bounds and regions: + // SQL: + // PARTITION p1 VALUES LESS THAN (10, 'c'), + // PARTITION p2 VALUES LESS THAN (20, 'h'), + // PARTITION p3 VALUES LESS THAN (20, 'm'), + // PARTITION p4 VALUES LESS THAN (50, 'p'), + // PARTITION p5 VALUES LESS THAN (MAXVALUE, 'x'), + // PARTITION p6 VALUES LESS THAN (MAXVALUE, MAXVALUE), + // first column bounds: + // [10, 20, 50, MAXVALUE] + // first column regions: + // [[1], [2, 3], [4], [5, 6]] + let first_column_bounds = value_lists .iter() .map(|x| &x[0]) @@ -136,16 +149,6 @@ impl PartitionRule for RangeColumnsPartitionRule { // "unwrap" is safe because we have checked that "self.column_list" contains all columns in "exprs" .unwrap(); - // an example of bounds and regions: - // SQL: - // PARTITION p1 VALUES LESS THAN (10, 'c'), - // PARTITION p2 VALUES LESS THAN (20, 'h'), - // PARTITION p3 VALUES LESS THAN (20, 'm'), - // PARTITION p4 VALUES LESS THAN (50, 'p'), - // PARTITION p5 VALUES LESS THAN (MAXVALUE, 'x'), - // PARTITION p6 VALUES LESS THAN (MAXVALUE, MAXVALUE), - // bounds: [10, 20, 50, MAXVALUE] - // regions: [[1], [2, 3], [4], [5, 6]] let regions = &self.first_column_regions; match self .first_column_bounds diff --git a/src/frontend/src/partitioning/range.rs b/src/frontend/src/partitioning/range.rs index 572ff81a37..b67998a912 100644 --- a/src/frontend/src/partitioning/range.rs +++ b/src/frontend/src/partitioning/range.rs @@ -41,7 +41,7 @@ use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId}; /// // TODO(LFC): Further clarify "partition" and "region". // Could be creating an extra layer between partition and region. -struct RangePartitionRule { +pub(crate) struct RangePartitionRule { column_name: String, // Does not store the last "MAXVALUE" bound; because in this way our binary search in finding // partitions are easier (besides, it's hard to represent "MAXVALUE" in our `Value`). @@ -51,6 +51,20 @@ struct RangePartitionRule { } impl RangePartitionRule { + // FIXME(LFC): no allow, for clippy temporarily + #[allow(dead_code)] + pub(crate) fn new( + column_name: impl Into, + bounds: Vec, + regions: Vec, + ) -> Self { + Self { + column_name: column_name.into(), + bounds, + regions, + } + } + fn column_name(&self) -> &String { &self.column_name } @@ -72,6 +86,9 @@ impl PartitionRule for RangePartitionRule { } fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { + if exprs.is_empty() { + return Ok(self.regions.clone()); + } debug_assert_eq!( exprs.len(), 1, diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs new file mode 100644 index 0000000000..879e058ea1 --- /dev/null +++ b/src/frontend/src/table.rs @@ -0,0 +1,608 @@ +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; +use common_query::error::Result as QueryResult; +use common_query::logical_plan::Expr; +use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; +use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::logical_plan::Expr as DfExpr; +use datafusion::physical_plan::Partitioning; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use snafu::prelude::*; +use store_api::storage::RegionId; +use table::error::Error as TableError; +use table::metadata::{FilterPushDownType, TableInfoRef}; +use table::Table; +use tokio::sync::RwLock; + +use crate::error::{self, Error, Result}; +use crate::mock::{DatanodeId, DatanodeInstance, TableScanPlan}; +use crate::partitioning::{Operator, PartitionExpr, PartitionRuleRef}; + +struct DistTable { + table_name: String, + schema: SchemaRef, + partition_rule: PartitionRuleRef, + region_dist_map: HashMap, + datanode_instances: HashMap, +} + +#[async_trait] +impl Table for DistTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_info(&self) -> TableInfoRef { + unimplemented!() + } + + async fn scan( + &self, + projection: &Option>, + filters: &[Expr], + limit: Option, + ) -> table::Result { + let regions = self.find_regions(filters).map_err(TableError::new)?; + let datanodes = self.find_datanodes(regions).map_err(TableError::new)?; + + let partition_execs = datanodes + .iter() + .map(|(datanode, _regions)| { + let datanode_instance = self + .datanode_instances + .get(datanode) + .context(error::DatanodeInstanceSnafu { + datanode: *datanode, + })? + .clone(); + // TODO(LFC): Pass in "regions" when Datanode supports multi regions for a table. + Ok(PartitionExec { + table_name: self.table_name.clone(), + datanode_instance, + projection: projection.clone(), + filters: filters.to_vec(), + limit, + batches: Arc::new(RwLock::new(None)), + }) + }) + .collect::>>() + .map_err(TableError::new)?; + + let dist_scan = DistTableScan { + schema: project_schema(self.schema(), projection), + partition_execs, + }; + Ok(Arc::new(dist_scan)) + } + + fn supports_filter_pushdown(&self, _filter: &Expr) -> table::Result { + Ok(FilterPushDownType::Inexact) + } +} + +impl DistTable { + // TODO(LFC): Finding regions now seems less efficient, should be further looked into. + fn find_regions(&self, filters: &[Expr]) -> Result> { + let regions = if let Some((first, rest)) = filters.split_first() { + let mut target = self.find_regions0(first)?; + for filter in rest { + let regions = self.find_regions0(filter)?; + + // When all filters are provided as a collection, it often implicitly states that + // "all filters must be satisfied". So we join all the results here. + target.retain(|x| regions.contains(x)); + + // Failed fast, empty collection join any is empty. + if target.is_empty() { + break; + } + } + target.into_iter().collect::>() + } else { + self.partition_rule.find_regions(&[])? + }; + ensure!( + !regions.is_empty(), + error::FindRegionsSnafu { + filters: filters.to_vec() + } + ); + Ok(regions) + } + + // TODO(LFC): Support other types of filter expr: + // - BETWEEN and IN (maybe more) + // - expr with arithmetic like "a + 1 < 10" (should have been optimized in logic plan?) + // - not comparison or neither "AND" nor "OR" operations, for example, "a LIKE x" + fn find_regions0(&self, filter: &Expr) -> Result> { + let expr = filter.df_expr(); + match expr { + DfExpr::BinaryExpr { left, op, right } if is_compare_op(op) => { + let column_op_value = match (left.as_ref(), right.as_ref()) { + (DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)), + (DfExpr::Literal(v), DfExpr::Column(c)) => { + Some((&c.name, reverse_operator(op), v)) + } + _ => None, + }; + if let Some((column, op, sv)) = column_op_value { + let value = sv + .clone() + .try_into() + .with_context(|_| error::ConvertScalarValueSnafu { value: sv.clone() })?; + return Ok(self + .partition_rule + .find_regions(&[PartitionExpr::new(column, op, value)])? + .into_iter() + .collect::>()); + } + } + DfExpr::BinaryExpr { left, op, right } + if matches!(op, Operator::And | Operator::Or) => + { + let left_regions = self.find_regions0(&(*left.clone()).into())?; + let right_regions = self.find_regions0(&(*right.clone()).into())?; + let regions = match op { + Operator::And => left_regions + .intersection(&right_regions) + .cloned() + .collect::>(), + Operator::Or => left_regions + .union(&right_regions) + .cloned() + .collect::>(), + _ => unreachable!(), + }; + return Ok(regions); + } + _ => (), + } + + // Returns all regions for not supported partition expr as a safety hatch. + Ok(self + .partition_rule + .find_regions(&[])? + .into_iter() + .collect::>()) + } + + fn find_datanodes(&self, regions: Vec) -> Result>> { + let mut datanodes = HashMap::new(); + for region in regions.iter() { + let datanode = *self + .region_dist_map + .get(region) + .context(error::FindDatanodeSnafu { region: *region })?; + datanodes + .entry(datanode) + .or_insert_with(Vec::new) + .push(*region); + } + Ok(datanodes) + } +} + +fn project_schema(table_schema: SchemaRef, projection: &Option>) -> SchemaRef { + if let Some(projection) = &projection { + let columns = table_schema.column_schemas(); + let projected = projection + .iter() + .map(|x| columns[*x].clone()) + .collect::>(); + Arc::new(Schema::new(projected)) + } else { + table_schema + } +} + +fn is_compare_op(op: &Operator) -> bool { + matches!( + *op, + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq + ) +} + +fn reverse_operator(op: &Operator) -> Operator { + match *op { + Operator::Lt => Operator::Gt, + Operator::Gt => Operator::Lt, + Operator::LtEq => Operator::GtEq, + Operator::GtEq => Operator::LtEq, + _ => *op, + } +} + +#[derive(Debug)] +struct DistTableScan { + schema: SchemaRef, + partition_execs: Vec, +} + +#[async_trait] +impl PhysicalPlan for DistTableScan { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.partition_execs.len()) + } + + fn children(&self) -> Vec { + vec![] + } + + fn with_new_children(&self, _children: Vec) -> QueryResult { + unimplemented!() + } + + async fn execute( + &self, + partition: usize, + _runtime: Arc, + ) -> QueryResult { + let exec = &self.partition_execs[partition]; + exec.maybe_init().await; + Ok(exec.as_stream().await) + } +} + +#[derive(Debug)] +struct PartitionExec { + table_name: String, + datanode_instance: DatanodeInstance, + projection: Option>, + filters: Vec, + limit: Option, + batches: Arc>>, +} + +impl PartitionExec { + async fn maybe_init(&self) { + if self.batches.read().await.is_some() { + return; + } + + let mut batches = self.batches.write().await; + if batches.is_some() { + return; + } + + let plan = TableScanPlan { + table_name: self.table_name.clone(), + projection: self.projection.clone(), + filters: self.filters.clone(), + limit: self.limit, + }; + let result = self.datanode_instance.grpc_table_scan(plan).await; + let _ = batches.insert(result); + } + + async fn as_stream(&self) -> SendableRecordBatchStream { + let batches = self.batches.read().await; + batches + .as_ref() + .expect("should have been initialized in \"maybe_init\"") + .as_stream() + } +} + +// FIXME(LFC): no allow, for clippy temporarily +#[allow(clippy::print_stdout)] +#[cfg(test)] +mod test { + use catalog::RegisterTableRequest; + use client::Database; + use common_recordbatch::{util, RecordBatch}; + use datafusion::arrow_print; + use datafusion_common::record_batch::RecordBatch as DfRecordBatch; + use datafusion_expr::expr_fn::col; + use datafusion_expr::expr_fn::{and, binary_expr, or}; + use datafusion_expr::lit; + use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; + use datanode::instance::Instance; + use datatypes::prelude::{ConcreteDataType, VectorRef}; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::{Int32Vector, UInt32Vector}; + use table::test_util::MemTable; + use table::TableRef; + use tempdir::TempDir; + + use super::*; + use crate::partitioning::range::RangePartitionRule; + + #[tokio::test] + async fn test_dist_table_scan() { + let table = Arc::new(new_dist_table().await); + + // should scan all regions + // select * from numbers + let projection = None; + let filters = vec![]; + exec_table_scan(table.clone(), projection, filters, None).await; + println!(); + + // should scan only region 1 + // select a, row_id from numbers where a < 10 + let projection = Some(vec![0, 1]); + let filters = vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()]; + exec_table_scan(table.clone(), projection, filters, None).await; + println!(); + + // should scan region 1 and 2 + // select a, row_id from numbers where a < 15 + let projection = Some(vec![0, 1]); + let filters = vec![binary_expr(col("a"), Operator::Lt, lit(15)).into()]; + exec_table_scan(table.clone(), projection, filters, None).await; + println!(); + + // should scan region 2 and 3 + // select a, row_id from numbers where a < 40 and a >= 10 + let projection = Some(vec![0, 1]); + let filters = vec![and( + binary_expr(col("a"), Operator::Lt, lit(40)), + binary_expr(col("a"), Operator::GtEq, lit(10)), + ) + .into()]; + exec_table_scan(table.clone(), projection, filters, None).await; + println!(); + + // should scan all regions + // select a, row_id from numbers where a < 1000 and row_id == 1 + let projection = Some(vec![0, 1]); + let filters = vec![and( + binary_expr(col("a"), Operator::Lt, lit(1000)), + binary_expr(col("row_id"), Operator::Eq, lit(1)), + ) + .into()]; + exec_table_scan(table.clone(), projection, filters, None).await; + } + + async fn exec_table_scan( + table: TableRef, + projection: Option>, + filters: Vec, + limit: Option, + ) { + let table_scan = table + .scan(&projection, filters.as_slice(), limit) + .await + .unwrap(); + + for partition in 0..table_scan.output_partitioning().partition_count() { + let result = table_scan + .execute(partition, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); + let recordbatches = util::collect(result).await.unwrap(); + + let df_recordbatch = recordbatches + .into_iter() + .map(|r| r.df_recordbatch) + .collect::>(); + + println!("DataFusion partition {}:", partition); + let pretty_print = arrow_print::write(&df_recordbatch); + let pretty_print = pretty_print.lines().collect::>(); + pretty_print.iter().for_each(|x| println!("{}", x)); + } + } + + async fn new_dist_table() -> DistTable { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), + ColumnSchema::new("row_id", ConcreteDataType::uint32_datatype(), true), + ])); + + // PARTITION BY RANGE (a) ( + // PARTITION r1 VALUES LESS THAN (10), + // PARTITION r2 VALUES LESS THAN (20), + // PARTITION r3 VALUES LESS THAN (50), + // PARTITION r4 VALUES LESS THAN (MAXVALUE), + // ) + let partition_rule = RangePartitionRule::new( + "a", + vec![10_i32.into(), 20_i32.into(), 50_i32.into()], + vec![1_u64, 2, 3, 4], + ); + + let table1 = new_memtable(schema.clone(), (0..5).collect::>()); + let table2 = new_memtable(schema.clone(), (10..15).collect::>()); + let table3 = new_memtable(schema.clone(), (30..35).collect::>()); + let table4 = new_memtable(schema.clone(), (100..105).collect::>()); + + let instance1 = create_datanode_instance(1, table1).await; + let instance2 = create_datanode_instance(2, table2).await; + let instance3 = create_datanode_instance(3, table3).await; + let instance4 = create_datanode_instance(4, table4).await; + + let datanode_instances = HashMap::from([ + (instance1.datanode_id, instance1), + (instance2.datanode_id, instance2), + (instance3.datanode_id, instance3), + (instance4.datanode_id, instance4), + ]); + + DistTable { + table_name: "dist_numbers".to_string(), + schema, + partition_rule: Arc::new(partition_rule), + region_dist_map: HashMap::from([(1_u64, 1), (2_u64, 2), (3_u64, 3), (4_u64, 4)]), + datanode_instances, + } + } + + fn new_memtable(schema: SchemaRef, data: Vec) -> MemTable { + let rows = data.len() as u32; + let columns: Vec = vec![ + // column "a" + Arc::new(Int32Vector::from_slice(data)), + // column "row_id" + Arc::new(UInt32Vector::from_slice((1..=rows).collect::>())), + ]; + let recordbatch = RecordBatch::new(schema, columns).unwrap(); + MemTable::new("dist_numbers", recordbatch) + } + + async fn create_datanode_instance( + datanode_id: DatanodeId, + table: MemTable, + ) -> DatanodeInstance { + let wal_tmp_dir = TempDir::new_in("/tmp", "gt_wal_dist_table_test").unwrap(); + let data_tmp_dir = TempDir::new_in("/tmp", "gt_data_dist_table_test").unwrap(); + let opts = DatanodeOptions { + wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + storage: ObjectStoreConfig::File { + data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + }, + ..Default::default() + }; + + let instance = Arc::new(Instance::new(&opts).await.unwrap()); + instance.start().await.unwrap(); + + let catalog_manager = instance.catalog_manager().clone(); + catalog_manager + .register_table(RegisterTableRequest { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table_name: table.table_name().to_string(), + table_id: 1234, + table: Arc::new(table), + }) + .await + .unwrap(); + + let client = crate::tests::create_datanode_client(instance).await; + DatanodeInstance::new( + datanode_id, + catalog_manager, + Database::new("greptime", client), + ) + } + + #[tokio::test] + async fn test_find_regions() { + let table = new_dist_table().await; + + let test = |filters: Vec, expect_regions: Vec| { + let mut regions = table.find_regions(filters.as_slice()).unwrap(); + regions.sort(); + + assert_eq!(regions, expect_regions); + }; + + // test simple filter + test( + vec![binary_expr(col("a"), Operator::Lt, lit(10)).into()], // a < 10 + vec![1], + ); + test( + vec![binary_expr(col("a"), Operator::LtEq, lit(10)).into()], // a <= 10 + vec![1, 2], + ); + test( + vec![binary_expr(lit(20), Operator::Gt, col("a")).into()], // 20 > a + vec![1, 2], + ); + test( + vec![binary_expr(lit(20), Operator::GtEq, col("a")).into()], // 20 >= a + vec![1, 2, 3], + ); + test( + vec![binary_expr(lit(45), Operator::Eq, col("a")).into()], // 45 == a + vec![3], + ); + test( + vec![binary_expr(col("a"), Operator::NotEq, lit(45)).into()], // a != 45 + vec![1, 2, 3, 4], + ); + test( + vec![binary_expr(col("a"), Operator::Gt, lit(50)).into()], // a > 50 + vec![4], + ); + + // test multiple filters + test( + vec![ + binary_expr(col("a"), Operator::Gt, lit(10)).into(), + binary_expr(col("a"), Operator::Gt, lit(50)).into(), + ], // [a > 10, a > 50] + vec![4], + ); + + // test finding all regions when provided with not supported filters or not partition column + test( + vec![binary_expr(col("row_id"), Operator::LtEq, lit(123)).into()], // row_id <= 123 + vec![1, 2, 3, 4], + ); + test( + vec![binary_expr(col("b"), Operator::Like, lit("foo%")).into()], // b LIKE 'foo%' + vec![1, 2, 3, 4], + ); + test( + vec![binary_expr(col("c"), Operator::Gt, lit(123)).into()], // c > 789 + vec![1, 2, 3, 4], + ); + + // test complex "AND" or "OR" filters + test( + vec![and( + binary_expr(col("row_id"), Operator::Lt, lit(1)), + or( + binary_expr(col("row_id"), Operator::Lt, lit(1)), + binary_expr(col("a"), Operator::Lt, lit(1)), + ), + ) + .into()], // row_id < 1 OR (row_id < 1 AND a > 1) + vec![1, 2, 3, 4], + ); + test( + vec![or( + binary_expr(col("a"), Operator::Lt, lit(20)), + binary_expr(col("a"), Operator::GtEq, lit(20)), + ) + .into()], // a < 20 OR a >= 20 + vec![1, 2, 3, 4], + ); + test( + vec![and( + binary_expr(col("a"), Operator::Lt, lit(20)), + binary_expr(col("a"), Operator::Lt, lit(50)), + ) + .into()], // a < 20 AND a < 50 + vec![1, 2], + ); + + // test failed to find regions by contradictory filters + let regions = table.find_regions( + vec![and( + binary_expr(col("a"), Operator::Lt, lit(20)), + binary_expr(col("a"), Operator::GtEq, lit(20)), + ) + .into()] + .as_slice(), + ); // a < 20 AND a >= 20 + assert!(matches!( + regions.unwrap_err(), + error::Error::FindRegions { .. } + )); + } +} diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 11763baf91..b4a408109e 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -20,7 +20,11 @@ async fn create_datanode_instance() -> Arc { pub(crate) async fn create_frontend_instance() -> Arc { let datanode_instance = create_datanode_instance().await; + let client = create_datanode_client(datanode_instance).await; + Arc::new(Instance::with_client(client)) +} +pub(crate) async fn create_datanode_client(datanode_instance: Arc) -> Client { let (client, server) = tokio::io::duplex(1024); let runtime = Arc::new( @@ -67,6 +71,5 @@ pub(crate) async fn create_frontend_instance() -> Arc { }), ) .unwrap(); - let client = Client::with_manager_and_urls(channel_manager, vec![addr]); - Arc::new(Instance::with_client(client)) + Client::with_manager_and_urls(channel_manager, vec![addr]) }