feat: pushdown filters for some information_schema tables (#3091)

* feat: pushdown scan request to information_schema tables stream

* feat: supports filter pushdown for columns

* feat: supports filter pushdown for some information_schema tables

* fix: typo

* fix: predicate evaluate

* fix: typo

* test: predicates

* fix: comment

* fix: pub mod

* docs: improve comments

* fix: cr comments and supports like predicate

* chore: typo

* fix: cargo toml format

* chore: apply suggestion
This commit is contained in:
dennis zhuang
2024-01-05 15:18:22 +08:00
committed by GitHub
parent e0525dbfeb
commit c2db970687
11 changed files with 887 additions and 81 deletions

1
Cargo.lock generated
View File

@@ -1183,6 +1183,7 @@ version = "0.5.0"
dependencies = [
"api",
"arc-swap",
"arrow",
"arrow-schema",
"async-stream",
"async-trait",

View File

@@ -11,6 +11,7 @@ testing = []
api.workspace = true
arc-swap = "1.0"
arrow-schema.workspace = true
arrow.workspace = true
async-stream.workspace = true
async-trait = "0.1"
build-data = "0.1"

View File

@@ -15,6 +15,7 @@
mod columns;
mod key_column_usage;
mod memory_table;
mod predicate;
mod schemata;
mod table_names;
mod tables;
@@ -29,6 +30,7 @@ use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use lazy_static::lazy_static;
use paste::paste;
pub(crate) use predicate::Predicates;
use snafu::ResultExt;
use store_api::data_source::DataSource;
use store_api::storage::{ScanRequest, TableId};
@@ -159,7 +161,7 @@ impl InformationSchemaProvider {
fn build_table(&self, name: &str) -> Option<TableRef> {
self.information_table(name).map(|table| {
let table_info = Self::table_info(self.catalog_name.clone(), &table);
let filter_pushdown = FilterPushDownType::Unsupported;
let filter_pushdown = FilterPushDownType::Inexact;
let thin_table = ThinTable::new(table_info, filter_pushdown);
let data_source = Arc::new(InformationTableDataSource::new(table));
@@ -238,7 +240,7 @@ trait InformationTable {
fn schema(&self) -> SchemaRef;
fn to_stream(&self) -> Result<SendableRecordBatchStream>;
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
fn table_type(&self) -> TableType {
TableType::Temporary
@@ -272,7 +274,7 @@ impl DataSource for InformationTableDataSource {
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
let projection = request.projection;
let projection = request.projection.clone();
let projected_schema = match &projection {
Some(projection) => self.try_project(projection)?,
None => self.table.schema(),
@@ -280,7 +282,7 @@ impl DataSource for InformationTableDataSource {
let stream = self
.table
.to_stream()
.to_stream(request)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)
.map_err(BoxedError::new)?

View File

@@ -29,14 +29,16 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use store_api::storage::{ScanRequest, TableId};
use super::{InformationTable, COLUMNS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::CatalogManager;
pub(super) struct InformationSchemaColumns {
@@ -102,14 +104,14 @@ impl InformationTable for InformationSchemaColumns {
self.schema.clone()
}
fn to_stream(&self) -> Result<SendableRecordBatchStream> {
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_columns()
.make_columns(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
@@ -165,12 +167,13 @@ impl InformationSchemaColumnsBuilder {
}
/// Construct the `information_schema.columns` virtual table
async fn make_columns(&mut self) -> Result<RecordBatch> {
async fn make_columns(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
@@ -201,6 +204,7 @@ impl InformationSchemaColumnsBuilder {
};
self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table_name,
@@ -219,6 +223,7 @@ impl InformationSchemaColumnsBuilder {
fn add_column(
&mut self,
predicates: &Predicates,
catalog_name: &str,
schema_name: &str,
table_name: &str,
@@ -227,6 +232,19 @@ impl InformationSchemaColumnsBuilder {
) {
let data_type = &column_schema.data_type.name();
let row = [
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(TABLE_NAME, &Value::from(table_name)),
(COLUMN_NAME, &Value::from(column_schema.name.as_str())),
(DATA_TYPE, &Value::from(data_type.as_str())),
(SEMANTIC_TYPE, &Value::from(semantic_type)),
];
if !predicates.eval(&row) {
return;
}
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
@@ -279,7 +297,7 @@ impl DfPartitionStream for InformationSchemaColumns {
schema,
futures::stream::once(async move {
builder
.make_columns()
.make_columns(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)

View File

@@ -25,17 +25,26 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use store_api::storage::{ScanRequest, TableId};
use super::KEY_COLUMN_USAGE;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationTable;
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const CONSTRAINT_SCHEMA: &str = "constraint_schema";
const CONSTRAINT_NAME: &str = "constraint_name";
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const COLUMN_NAME: &str = "column_name";
const ORDINAL_POSITION: &str = "ordinal_position";
/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
pub(super) struct InformationSchemaKeyColumnUsage {
schema: SchemaRef,
@@ -60,24 +69,16 @@ impl InformationSchemaKeyColumnUsage {
false,
),
ColumnSchema::new(
"constraint_schema",
CONSTRAINT_SCHEMA,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"constraint_name",
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("column_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"ordinal_position",
ConcreteDataType::uint32_datatype(),
false,
),
ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(COLUMN_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(ORDINAL_POSITION, ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new(
"position_in_unique_constraint",
ConcreteDataType::uint32_datatype(),
@@ -123,14 +124,14 @@ impl InformationTable for InformationSchemaKeyColumnUsage {
self.schema.clone()
}
fn to_stream(&self) -> Result<SendableRecordBatchStream> {
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_key_column_usage()
.make_key_column_usage(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
@@ -192,14 +193,14 @@ impl InformationSchemaKeyColumnUsageBuilder {
}
/// Construct the `information_schema.KEY_COLUMN_USAGE` virtual table
async fn make_key_column_usage(&mut self) -> Result<RecordBatch> {
async fn make_key_column_usage(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
let mut time_index_constraints = vec![];
let mut primary_constraints = vec![];
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
@@ -223,11 +224,15 @@ impl InformationSchemaKeyColumnUsageBuilder {
for (idx, column) in schema.column_schemas().iter().enumerate() {
if column.is_time_index() {
time_index_constraints.push((
schema_name.clone(),
table_name.clone(),
column.name.clone(),
));
self.add_key_column_usage(
&predicates,
&schema_name,
"TIME INDEX",
&schema_name,
&table_name,
&column.name,
1, //always 1 for time index
);
}
if keys.contains(&idx) {
primary_constraints.push((
@@ -244,22 +249,11 @@ impl InformationSchemaKeyColumnUsageBuilder {
}
}
for (i, (schema_name, table_name, column_name)) in
time_index_constraints.into_iter().enumerate()
{
self.add_key_column_usage(
&schema_name,
"TIME INDEX",
&schema_name,
&table_name,
&column_name,
i as u32 + 1,
);
}
for (i, (schema_name, table_name, column_name)) in
primary_constraints.into_iter().enumerate()
{
self.add_key_column_usage(
&predicates,
&schema_name,
"PRIMARY",
&schema_name,
@@ -274,8 +268,10 @@ impl InformationSchemaKeyColumnUsageBuilder {
// TODO(dimbtp): Foreign key constraint has not `None` value for last 4
// fields, but it is not supported yet.
#[allow(clippy::too_many_arguments)]
fn add_key_column_usage(
&mut self,
predicates: &Predicates,
constraint_schema: &str,
constraint_name: &str,
table_schema: &str,
@@ -283,6 +279,19 @@ impl InformationSchemaKeyColumnUsageBuilder {
column_name: &str,
ordinal_position: u32,
) {
let row = [
(CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
(CONSTRAINT_NAME, &Value::from(constraint_name)),
(TABLE_SCHEMA, &Value::from(table_schema)),
(TABLE_NAME, &Value::from(table_name)),
(COLUMN_NAME, &Value::from(column_name)),
(ORDINAL_POSITION, &Value::from(ordinal_position)),
];
if !predicates.eval(&row) {
return;
}
self.constraint_catalog.push(Some("def"));
self.constraint_schema.push(Some(constraint_schema));
self.constraint_name.push(Some(constraint_name));
@@ -328,7 +337,7 @@ impl DfPartitionStream for InformationSchemaKeyColumnUsage {
schema,
futures::stream::once(async move {
builder
.make_key_column_usage()
.make_key_column_usage(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)

View File

@@ -26,7 +26,7 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::schema::SchemaRef;
use datatypes::vectors::VectorRef;
use snafu::ResultExt;
use store_api::storage::TableId;
use store_api::storage::{ScanRequest, TableId};
pub use tables::get_schema_columns;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
@@ -74,7 +74,7 @@ impl InformationTable for MemoryTable {
self.schema.clone()
}
fn to_stream(&self) -> Result<SendableRecordBatchStream> {
fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
@@ -169,7 +169,7 @@ mod tests {
assert_eq!("test", table.table_name());
assert_eq!(schema, InformationTable::schema(&table));
let stream = table.to_stream().unwrap();
let stream = table.to_stream(ScanRequest::default()).unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
@@ -198,7 +198,7 @@ mod tests {
assert_eq!("test", table.table_name());
assert_eq!(schema, InformationTable::schema(&table));
let stream = table.to_stream().unwrap();
let stream = table.to_stream(ScanRequest::default()).unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -0,0 +1,609 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::array::StringArray;
use arrow::compute::kernels::comparison;
use common_query::logical_plan::DfExpr;
use datafusion::common::ScalarValue;
use datafusion::logical_expr::expr::Like;
use datafusion::logical_expr::Operator;
use datatypes::value::Value;
use store_api::storage::ScanRequest;
type ColumnName = String;
/// Predicate to filter `information_schema` tables stream,
/// we only support these simple predicates currently.
/// TODO(dennis): supports more predicate types.
#[derive(Clone, PartialEq, Eq, Debug)]
enum Predicate {
Eq(ColumnName, Value),
Like(ColumnName, String, bool),
NotEq(ColumnName, Value),
InList(ColumnName, Vec<Value>),
And(Box<Predicate>, Box<Predicate>),
Or(Box<Predicate>, Box<Predicate>),
Not(Box<Predicate>),
}
impl Predicate {
/// Evaluate the predicate with the row, returns:
/// - `None` when the predicate can't evaluate with the row.
/// - `Some(true)` when the predicate is satisfied,
/// - `Some(false)` when the predicate is not satisfied,
fn eval(&self, row: &[(&str, &Value)]) -> Option<bool> {
match self {
Predicate::Eq(c, v) => {
for (column, value) in row {
if c != column {
continue;
}
return Some(v == *value);
}
}
Predicate::Like(c, pattern, case_insensitive) => {
for (column, value) in row {
if c != column {
continue;
}
let Value::String(bs) = value else {
continue;
};
return like_utf8(bs.as_utf8(), pattern, case_insensitive);
}
}
Predicate::NotEq(c, v) => {
for (column, value) in row {
if c != column {
continue;
}
return Some(v != *value);
}
}
Predicate::InList(c, values) => {
for (column, value) in row {
if c != column {
continue;
}
return Some(values.iter().any(|v| v == *value));
}
}
Predicate::And(left, right) => {
let left = left.eval(row);
// short-circuit
if matches!(left, Some(false)) {
return Some(false);
}
return match (left, right.eval(row)) {
(Some(left), Some(right)) => Some(left && right),
(None, Some(false)) => Some(false),
_ => None,
};
}
Predicate::Or(left, right) => {
let left = left.eval(row);
// short-circuit
if matches!(left, Some(true)) {
return Some(true);
}
return match (left, right.eval(row)) {
(Some(left), Some(right)) => Some(left || right),
(None, Some(true)) => Some(true),
_ => None,
};
}
Predicate::Not(p) => {
let Some(b) = p.eval(row) else {
return None;
};
return Some(!b);
}
}
// Can't evaluate predicate with the row
None
}
/// Try to create a predicate from datafusion [`Expr`], return None if fails.
fn from_expr(expr: DfExpr) -> Option<Predicate> {
match expr {
// NOT expr
DfExpr::Not(expr) => {
let Some(p) = Self::from_expr(*expr) else {
return None;
};
Some(Predicate::Not(Box::new(p)))
}
// expr LIKE pattern
DfExpr::Like(Like {
negated,
expr,
pattern,
case_insensitive,
..
}) if is_column(&expr) && is_string_literal(&pattern) => {
// Safety: ensured by gurad
let DfExpr::Column(c) = *expr else {
unreachable!();
};
let DfExpr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else {
unreachable!();
};
let p = Predicate::Like(c.name, pattern, case_insensitive);
if negated {
Some(Predicate::Not(Box::new(p)))
} else {
Some(p)
}
}
// left OP right
DfExpr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) {
// left == right
(DfExpr::Literal(scalar), Operator::Eq, DfExpr::Column(c))
| (DfExpr::Column(c), Operator::Eq, DfExpr::Literal(scalar)) => {
let Ok(v) = Value::try_from(scalar) else {
return None;
};
Some(Predicate::Eq(c.name, v))
}
// left != right
(DfExpr::Literal(scalar), Operator::NotEq, DfExpr::Column(c))
| (DfExpr::Column(c), Operator::NotEq, DfExpr::Literal(scalar)) => {
let Ok(v) = Value::try_from(scalar) else {
return None;
};
Some(Predicate::NotEq(c.name, v))
}
// left AND right
(left, Operator::And, right) => {
let Some(left) = Self::from_expr(left) else {
return None;
};
let Some(right) = Self::from_expr(right) else {
return None;
};
Some(Predicate::And(Box::new(left), Box::new(right)))
}
// left OR right
(left, Operator::Or, right) => {
let Some(left) = Self::from_expr(left) else {
return None;
};
let Some(right) = Self::from_expr(right) else {
return None;
};
Some(Predicate::Or(Box::new(left), Box::new(right)))
}
_ => None,
},
// [NOT] IN (LIST)
DfExpr::InList(list) => {
match (*list.expr, list.list, list.negated) {
// column [NOT] IN (v1, v2, v3, ...)
(DfExpr::Column(c), list, negated) if is_all_scalars(&list) => {
let mut values = Vec::with_capacity(list.len());
for scalar in list {
// Safety: checked by `is_all_scalars`
let DfExpr::Literal(scalar) = scalar else {
unreachable!();
};
let Ok(value) = Value::try_from(scalar) else {
return None;
};
values.push(value);
}
let predicate = Predicate::InList(c.name, values);
if negated {
Some(Predicate::Not(Box::new(predicate)))
} else {
Some(predicate)
}
}
_ => None,
}
}
_ => None,
}
}
}
/// Perform SQL left LIKE right, return `None` if fail to evaluate.
/// - `s` the target string
/// - `pattern` the pattern just like '%abc'
/// - `case_insensitive` whether to perform case-insensitive like or not.
fn like_utf8(s: &str, pattern: &str, case_insensitive: &bool) -> Option<bool> {
let array = StringArray::from(vec![s]);
let patterns = StringArray::new_scalar(pattern);
let Ok(booleans) = (if *case_insensitive {
comparison::ilike(&array, &patterns)
} else {
comparison::like(&array, &patterns)
}) else {
return None;
};
// Safety: at least one value in result
Some(booleans.value(0))
}
fn is_string_literal(expr: &DfExpr) -> bool {
matches!(expr, DfExpr::Literal(ScalarValue::Utf8(Some(_))))
}
fn is_column(expr: &DfExpr) -> bool {
matches!(expr, DfExpr::Column(_))
}
/// A list of predicate
pub struct Predicates {
predicates: Vec<Predicate>,
}
impl Predicates {
/// Try its best to create predicates from [`ScanRequest`].
pub fn from_scan_request(request: &Option<ScanRequest>) -> Predicates {
if let Some(request) = request {
let mut predicates = Vec::with_capacity(request.filters.len());
for filter in &request.filters {
if let Some(predicate) = Predicate::from_expr(filter.df_expr().clone()) {
predicates.push(predicate);
}
}
Self { predicates }
} else {
Self {
predicates: Vec::new(),
}
}
}
/// Evaluate the predicates with the row.
/// returns true when all the predicates are satisfied or can't be evaluated.
pub fn eval(&self, row: &[(&str, &Value)]) -> bool {
// fast path
if self.predicates.is_empty() {
return true;
}
self.predicates
.iter()
.filter_map(|p| p.eval(row))
.all(|b| b)
}
}
/// Returns true when the values are all [`DfExpr::Literal`].
fn is_all_scalars(list: &[DfExpr]) -> bool {
list.iter().all(|v| matches!(v, DfExpr::Literal(_)))
}
#[cfg(test)]
mod tests {
use datafusion::common::{Column, ScalarValue};
use datafusion::logical_expr::expr::InList;
use datafusion::logical_expr::BinaryExpr;
use super::*;
#[test]
fn test_predicate_eval() {
let a_col = "a".to_string();
let b_col = "b".to_string();
let a_value = Value::from("a_value");
let b_value = Value::from("b_value");
let wrong_value = Value::from("wrong_value");
let a_row = [(a_col.as_str(), &a_value)];
let b_row = [("b", &wrong_value)];
let wrong_row = [(a_col.as_str(), &wrong_value)];
// Predicate::Eq
let p = Predicate::Eq(a_col.clone(), a_value.clone());
assert!(p.eval(&a_row).unwrap());
assert!(p.eval(&b_row).is_none());
assert!(!p.eval(&wrong_row).unwrap());
// Predicate::NotEq
let p = Predicate::NotEq(a_col.clone(), a_value.clone());
assert!(!p.eval(&a_row).unwrap());
assert!(p.eval(&b_row).is_none());
assert!(p.eval(&wrong_row).unwrap());
// Predicate::InList
let p = Predicate::InList(a_col.clone(), vec![a_value.clone(), b_value.clone()]);
assert!(p.eval(&a_row).unwrap());
assert!(p.eval(&b_row).is_none());
assert!(!p.eval(&wrong_row).unwrap());
assert!(p.eval(&[(&a_col, &b_value)]).unwrap());
let p1 = Predicate::Eq(a_col.clone(), a_value.clone());
let p2 = Predicate::Eq(b_col.clone(), b_value.clone());
let row = [(a_col.as_str(), &a_value), (b_col.as_str(), &b_value)];
let wrong_row = [(a_col.as_str(), &a_value), (b_col.as_str(), &wrong_value)];
//Predicate::And
let p = Predicate::And(Box::new(p1.clone()), Box::new(p2.clone()));
assert!(p.eval(&row).unwrap());
assert!(!p.eval(&wrong_row).unwrap());
assert!(p.eval(&[]).is_none());
assert!(p.eval(&[("c", &a_value)]).is_none());
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap());
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.is_none());
assert!(!p
.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.unwrap());
//Predicate::Or
let p = Predicate::Or(Box::new(p1), Box::new(p2));
assert!(p.eval(&row).unwrap());
assert!(p.eval(&wrong_row).unwrap());
assert!(p.eval(&[]).is_none());
assert!(p.eval(&[("c", &a_value)]).is_none());
assert!(!p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
.unwrap());
assert!(p
.eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
.is_none());
}
#[test]
fn test_predicate_like() {
// case insensitive
let expr = DfExpr::Like(Like {
negated: false,
expr: Box::new(column("a")),
pattern: Box::new(string_literal("%abc")),
case_insensitive: true,
escape_char: None,
});
let p = Predicate::from_expr(expr).unwrap();
assert!(
matches!(&p, Predicate::Like(c, pattern, case_insensitive) if
c == "a"
&& pattern == "%abc"
&& *case_insensitive)
);
let match_row = [
("a", &Value::from("hello AbC")),
("b", &Value::from("b value")),
];
let unmatch_row = [("a", &Value::from("bca")), ("b", &Value::from("b value"))];
assert!(p.eval(&match_row).unwrap());
assert!(!p.eval(&unmatch_row).unwrap());
assert!(p.eval(&[]).is_none());
// case sensitive
let expr = DfExpr::Like(Like {
negated: false,
expr: Box::new(column("a")),
pattern: Box::new(string_literal("%abc")),
case_insensitive: false,
escape_char: None,
});
let p = Predicate::from_expr(expr).unwrap();
assert!(
matches!(&p, Predicate::Like(c, pattern, case_insensitive) if
c == "a"
&& pattern == "%abc"
&& !*case_insensitive)
);
assert!(!p.eval(&match_row).unwrap());
assert!(!p.eval(&unmatch_row).unwrap());
assert!(p.eval(&[]).is_none());
// not like
let expr = DfExpr::Like(Like {
negated: true,
expr: Box::new(column("a")),
pattern: Box::new(string_literal("%abc")),
case_insensitive: true,
escape_char: None,
});
let p = Predicate::from_expr(expr).unwrap();
assert!(!p.eval(&match_row).unwrap());
assert!(p.eval(&unmatch_row).unwrap());
assert!(p.eval(&[]).is_none());
}
fn column(name: &str) -> DfExpr {
DfExpr::Column(Column {
relation: None,
name: name.to_string(),
})
}
fn string_literal(v: &str) -> DfExpr {
DfExpr::Literal(ScalarValue::Utf8(Some(v.to_string())))
}
fn match_string_value(v: &Value, expected: &str) -> bool {
matches!(v, Value::String(bs) if bs.as_utf8() == expected)
}
fn match_string_values(vs: &[Value], expected: &[&str]) -> bool {
assert_eq!(vs.len(), expected.len());
let mut result = true;
for (i, v) in vs.iter().enumerate() {
result = result && match_string_value(v, expected[i]);
}
result
}
fn mock_exprs() -> (DfExpr, DfExpr) {
let expr1 = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(column("a")),
op: Operator::Eq,
right: Box::new(string_literal("a_value")),
});
let expr2 = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(column("b")),
op: Operator::NotEq,
right: Box::new(string_literal("b_value")),
});
(expr1, expr2)
}
#[test]
fn test_predicate_from_expr() {
let (expr1, expr2) = mock_exprs();
let p1 = Predicate::from_expr(expr1.clone()).unwrap();
assert!(matches!(&p1, Predicate::Eq(column, v) if column == "a"
&& match_string_value(v, "a_value")));
let p2 = Predicate::from_expr(expr2.clone()).unwrap();
assert!(matches!(&p2, Predicate::NotEq(column, v) if column == "b"
&& match_string_value(v, "b_value")));
let and_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr1.clone()),
op: Operator::And,
right: Box::new(expr2.clone()),
});
let or_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr1.clone()),
op: Operator::Or,
right: Box::new(expr2.clone()),
});
let not_expr = DfExpr::Not(Box::new(expr1.clone()));
let and_p = Predicate::from_expr(and_expr).unwrap();
assert!(matches!(and_p, Predicate::And(left, right) if *left == p1 && *right == p2));
let or_p = Predicate::from_expr(or_expr).unwrap();
assert!(matches!(or_p, Predicate::Or(left, right) if *left == p1 && *right == p2));
let not_p = Predicate::from_expr(not_expr).unwrap();
assert!(matches!(not_p, Predicate::Not(p) if *p == p1));
let inlist_expr = DfExpr::InList(InList {
expr: Box::new(column("a")),
list: vec![string_literal("a1"), string_literal("a2")],
negated: false,
});
let inlist_p = Predicate::from_expr(inlist_expr).unwrap();
assert!(matches!(&inlist_p, Predicate::InList(c, values) if c == "a"
&& match_string_values(values, &["a1", "a2"])));
let inlist_expr = DfExpr::InList(InList {
expr: Box::new(column("a")),
list: vec![string_literal("a1"), string_literal("a2")],
negated: true,
});
let inlist_p = Predicate::from_expr(inlist_expr).unwrap();
assert!(matches!(inlist_p, Predicate::Not(p) if
matches!(&*p,
Predicate::InList(c, values) if c == "a"
&& match_string_values(values, &["a1", "a2"]))));
}
#[test]
fn test_predicates_from_scan_request() {
let predicates = Predicates::from_scan_request(&None);
assert!(predicates.predicates.is_empty());
let (expr1, expr2) = mock_exprs();
let request = ScanRequest {
filters: vec![expr1.into(), expr2.into()],
..Default::default()
};
let predicates = Predicates::from_scan_request(&Some(request));
assert_eq!(2, predicates.predicates.len());
assert!(
matches!(&predicates.predicates[0], Predicate::Eq(column, v) if column == "a"
&& match_string_value(v, "a_value"))
);
assert!(
matches!(&predicates.predicates[1], Predicate::NotEq(column, v) if column == "b"
&& match_string_value(v, "b_value"))
);
}
#[test]
fn test_predicates_eval_row() {
let wrong_row = [
("a", &Value::from("a_value")),
("b", &Value::from("b_value")),
("c", &Value::from("c_value")),
];
let row = [
("a", &Value::from("a_value")),
("b", &Value::from("not_b_value")),
("c", &Value::from("c_value")),
];
let c_row = [("c", &Value::from("c_value"))];
// test empty predicates, always returns true
let predicates = Predicates::from_scan_request(&None);
assert!(predicates.eval(&row));
assert!(predicates.eval(&wrong_row));
assert!(predicates.eval(&c_row));
let (expr1, expr2) = mock_exprs();
let request = ScanRequest {
filters: vec![expr1.into(), expr2.into()],
..Default::default()
};
let predicates = Predicates::from_scan_request(&Some(request));
assert!(predicates.eval(&row));
assert!(!predicates.eval(&wrong_row));
assert!(predicates.eval(&c_row));
}
}

View File

@@ -25,17 +25,23 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::StringVectorBuilder;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use store_api::storage::{ScanRequest, TableId};
use super::SCHEMATA;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationTable;
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const CATALOG_NAME: &str = "catalog_name";
const SCHEMA_NAME: &str = "schema_name";
const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
/// The `information_schema.schemata` table implementation.
pub(super) struct InformationSchemaSchemata {
schema: SchemaRef,
@@ -54,15 +60,15 @@ impl InformationSchemaSchemata {
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new("catalog_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("schema_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(CATALOG_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(SCHEMA_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"default_character_set_name",
DEFAULT_CHARACTER_SET_NAME,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
"default_collation_name",
DEFAULT_COLLATION_NAME,
ConcreteDataType::string_datatype(),
false,
),
@@ -92,14 +98,14 @@ impl InformationTable for InformationSchemaSchemata {
self.schema.clone()
}
fn to_stream(&self) -> Result<SendableRecordBatchStream> {
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_schemata()
.make_schemata(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
@@ -147,12 +153,13 @@ impl InformationSchemaSchemataBuilder {
}
/// Construct the `information_schema.schemata` virtual table
async fn make_schemata(&mut self) -> Result<RecordBatch> {
async fn make_schemata(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
@@ -162,13 +169,24 @@ impl InformationSchemaSchemataBuilder {
continue;
}
self.add_schema(&catalog_name, &schema_name);
self.add_schema(&predicates, &catalog_name, &schema_name);
}
self.finish()
}
fn add_schema(&mut self, catalog_name: &str, schema_name: &str) {
fn add_schema(&mut self, predicates: &Predicates, catalog_name: &str, schema_name: &str) {
let row = [
(CATALOG_NAME, &Value::from(catalog_name)),
(SCHEMA_NAME, &Value::from(schema_name)),
(DEFAULT_CHARACTER_SET_NAME, &Value::from("utf8")),
(DEFAULT_COLLATION_NAME, &Value::from("utf8_bin")),
];
if !predicates.eval(&row) {
return;
}
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.charset_names.push(Some("utf8"));
@@ -200,7 +218,7 @@ impl DfPartitionStream for InformationSchemaSchemata {
schema,
futures::stream::once(async move {
builder
.make_schemata()
.make_schemata(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)

View File

@@ -25,18 +25,26 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use super::TABLES;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::InformationTable;
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;
const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const TABLE_TYPE: &str = "table_type";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";
pub(super) struct InformationSchemaTables {
schema: SchemaRef,
catalog_name: String,
@@ -54,12 +62,12 @@ impl InformationSchemaTables {
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new("table_catalog", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_schema", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_type", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("table_id", ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new("engine", ConcreteDataType::string_datatype(), true),
ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
]))
}
@@ -85,14 +93,14 @@ impl InformationTable for InformationSchemaTables {
self.schema.clone()
}
fn to_stream(&self) -> Result<SendableRecordBatchStream> {
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_tables()
.make_tables(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
@@ -142,12 +150,13 @@ impl InformationSchemaTablesBuilder {
}
/// Construct the `information_schema.tables` virtual table
async fn make_tables(&mut self) -> Result<RecordBatch> {
async fn make_tables(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
if !catalog_manager
@@ -167,6 +176,7 @@ impl InformationSchemaTablesBuilder {
{
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_name,
@@ -183,8 +193,10 @@ impl InformationSchemaTablesBuilder {
self.finish()
}
#[allow(clippy::too_many_arguments)]
fn add_table(
&mut self,
predicates: &Predicates,
catalog_name: &str,
schema_name: &str,
table_name: &str,
@@ -192,14 +204,27 @@ impl InformationSchemaTablesBuilder {
table_id: Option<u32>,
engine: Option<&str>,
) {
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.table_types.push(Some(match table_type {
let table_type = match table_type {
TableType::Base => "BASE TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
}));
};
let row = [
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(TABLE_NAME, &Value::from(table_name)),
(TABLE_TYPE, &Value::from(table_type)),
];
if !predicates.eval(&row) {
return;
}
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.table_types.push(Some(table_type));
self.table_ids.push(table_id);
self.engines.push(engine);
}
@@ -229,7 +254,7 @@ impl DfPartitionStream for InformationSchemaTables {
schema,
futures::stream::once(async move {
builder
.make_tables()
.make_tables(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)

View File

@@ -324,6 +324,40 @@ order by table_name;
| foo |
+------------+
select table_name
from information_schema.tables
where table_schema in ('my_db', 'public')
order by table_name;
+------------+
| table_name |
+------------+
| foo |
| numbers |
+------------+
select table_name
from information_schema.tables
where table_schema like 'my%'
order by table_name;
+------------+
| table_name |
+------------+
| foo |
+------------+
select table_name
from information_schema.tables
where table_schema not in ('my_db', 'information_schema')
order by table_name;
+------------+
| table_name |
+------------+
| numbers |
+------------+
select table_catalog, table_schema, table_name, table_type, engine
from information_schema.tables
where table_catalog = 'greptime'
@@ -350,6 +384,22 @@ order by table_schema, table_name, column_name;
| greptime | my_db | foo | ts | TimestampMillisecond | TIMESTAMP |
+---------------+--------------+------------+-------------+----------------------+---------------+
-- test query filter for columns --
select table_catalog, table_schema, table_name, column_name, data_type, semantic_type
from information_schema.columns
where table_catalog = 'greptime'
and (table_schema in ('public')
or
table_schema == 'my_db')
order by table_schema, table_name, column_name;
+---------------+--------------+------------+-------------+----------------------+---------------+
| table_catalog | table_schema | table_name | column_name | data_type | semantic_type |
+---------------+--------------+------------+-------------+----------------------+---------------+
| greptime | my_db | foo | ts | TimestampMillisecond | TIMESTAMP |
| greptime | public | numbers | number | UInt32 | TAG |
+---------------+--------------+------------+-------------+----------------------+---------------+
use public;
Affected Rows: 0
@@ -362,6 +412,44 @@ use information_schema;
Affected Rows: 0
-- test query filter for key_column_usage --
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME = 'TIME INDEX';
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name |
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | |
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX';
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name |
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
| def | public | PRIMARY | def | public | numbers | number | 1 | | | | |
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME LIKE '%INDEX';
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name |
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
| def | my_db | TIME INDEX | def | my_db | foo | ts | 1 | | | | |
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME NOT LIKE '%INDEX';
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
| constraint_catalog | constraint_schema | constraint_name | table_catalog | table_schema | table_name | column_name | ordinal_position | position_in_unique_constraint | referenced_table_schema | referenced_table_name | referenced_column_name |
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
| def | public | PRIMARY | def | public | numbers | number | 1 | | | | |
+--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME == 'TIME INDEX' AND CONSTRAINT_SCHEMA != 'my_db';
++
++
-- schemata --
desc table schemata;

View File

@@ -24,6 +24,21 @@ from information_schema.tables
where table_schema = 'my_db'
order by table_name;
select table_name
from information_schema.tables
where table_schema in ('my_db', 'public')
order by table_name;
select table_name
from information_schema.tables
where table_schema like 'my%'
order by table_name;
select table_name
from information_schema.tables
where table_schema not in ('my_db', 'information_schema')
order by table_name;
select table_catalog, table_schema, table_name, table_type, engine
from information_schema.tables
where table_catalog = 'greptime'
@@ -38,12 +53,32 @@ where table_catalog = 'greptime'
and table_schema != 'information_schema'
order by table_schema, table_name, column_name;
-- test query filter for columns --
select table_catalog, table_schema, table_name, column_name, data_type, semantic_type
from information_schema.columns
where table_catalog = 'greptime'
and (table_schema in ('public')
or
table_schema == 'my_db')
order by table_schema, table_name, column_name;
use public;
drop schema my_db;
use information_schema;
-- test query filter for key_column_usage --
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME = 'TIME INDEX';
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME != 'TIME INDEX';
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME LIKE '%INDEX';
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME NOT LIKE '%INDEX';
select * from KEY_COLUMN_USAGE where CONSTRAINT_NAME == 'TIME INDEX' AND CONSTRAINT_SCHEMA != 'my_db';
-- schemata --
desc table schemata;