Merge pull request #745

* fix nyc-taxi and util

* Merge branch 'replace-arrow2' into fix-others

* fix substrait

* fix warnings and error in test
This commit is contained in:
Ruihang Xia
2022-12-13 16:59:28 +08:00
committed by GitHub
parent 4b644aa482
commit a712382fba
7 changed files with 108 additions and 43 deletions

View File

@@ -20,7 +20,6 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArray};
@@ -32,9 +31,7 @@ use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::{Client, Database, Select};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::task::JoinSet;
const DATABASE_NAME: &str = "greptime";
@@ -86,10 +83,14 @@ async fn write_data(
pb_style: ProgressStyle,
) -> u128 {
let file = std::fs::File::open(&path).unwrap();
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let row_num = file_reader.metadata().file_metadata().num_rows();
let record_batch_reader = ParquetFileArrowReader::new(file_reader)
.get_record_reader(batch_size)
let record_batch_reader_builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let row_num = record_batch_reader_builder
.metadata()
.file_metadata()
.num_rows();
let record_batch_reader = record_batch_reader_builder
.with_batch_size(batch_size)
.build()
.unwrap();
let progress_bar = mpb.add(ProgressBar::new(row_num as _));
progress_bar.set_style(pb_style);
@@ -210,9 +211,10 @@ fn build_values(column: &ArrayRef) -> Values {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Union(_, _, _)
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::Map(_, _) => todo!(),
}
}

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use datafusion::logical_plan::DFSchemaRef;
use datafusion::common::DFSchemaRef;
use substrait_proto::protobuf::extensions::simple_extension_declaration::{
ExtensionFunction, MappingType,
};

View File

@@ -15,8 +15,8 @@
use std::collections::VecDeque;
use std::str::FromStr;
use datafusion::logical_plan::{Column, Expr};
use datafusion_expr::{expr_fn, lit, BuiltinScalarFunction, Operator};
use datafusion::common::Column;
use datafusion_expr::{expr_fn, lit, Between, BinaryExpr, BuiltinScalarFunction, Expr, Operator};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
use substrait_proto::protobuf::expression::field_reference::ReferenceType as FieldReferenceType;
@@ -311,21 +311,21 @@ pub fn convert_scalar_function(
// skip GetIndexedField, unimplemented.
"between" => {
ensure_arg_len(3)?;
Expr::Between {
Expr::Between(Between {
expr: Box::new(inputs.pop_front().unwrap()),
negated: false,
low: Box::new(inputs.pop_front().unwrap()),
high: Box::new(inputs.pop_front().unwrap()),
}
})
}
"not_between" => {
ensure_arg_len(3)?;
Expr::Between {
Expr::Between(Between {
expr: Box::new(inputs.pop_front().unwrap()),
negated: true,
low: Box::new(inputs.pop_front().unwrap()),
high: Box::new(inputs.pop_front().unwrap()),
}
})
}
// skip Case, is covered in substrait::SwitchExpression.
// skip Cast and TryCast, is covered in substrait::Cast.
@@ -477,7 +477,7 @@ pub fn expression_from_df_expr(
rex_type: Some(RexType::Literal(l)),
}
}
Expr::BinaryExpr { left, op, right } => {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let left = expression_from_df_expr(ctx, left, schema)?;
let right = expression_from_df_expr(ctx, right, schema)?;
let arguments = utils::expression_to_argument(vec![left, right]);
@@ -518,12 +518,12 @@ pub fn expression_from_df_expr(
name: expr.to_string(),
}
.fail()?,
Expr::Between {
Expr::Between(Between {
expr,
negated,
low,
high,
} => {
}) => {
let expr = expression_from_df_expr(ctx, expr, schema)?;
let low = expression_from_df_expr(ctx, low, schema)?;
let high = expression_from_df_expr(ctx, high, schema)?;
@@ -564,7 +564,21 @@ pub fn expression_from_df_expr(
| Expr::WindowFunction { .. }
| Expr::AggregateUDF { .. }
| Expr::InList { .. }
| Expr::Wildcard => UnsupportedExprSnafu {
| Expr::Wildcard
| Expr::Like(_)
| Expr::ILike(_)
| Expr::SimilarTo(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsUnknown(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(..)
| Expr::QualifiedWildcard { .. } => todo!(),
Expr::GroupingSet(_) => UnsupportedExprSnafu {
name: expr.to_string(),
}
.fail()?,
@@ -628,6 +642,10 @@ mod utils {
Operator::RegexNotIMatch => "regex_not_i_match",
Operator::BitwiseAnd => "bitwise_and",
Operator::BitwiseOr => "bitwise_or",
Operator::BitwiseXor => "bitwise_xor",
Operator::BitwiseShiftRight => "bitwise_shift_right",
Operator::BitwiseShiftLeft => "bitwise_shift_left",
Operator::StringConcat => "string_concat",
}
}
@@ -679,7 +697,6 @@ mod utils {
BuiltinScalarFunction::Sqrt => "sqrt",
BuiltinScalarFunction::Tan => "tan",
BuiltinScalarFunction::Trunc => "trunc",
BuiltinScalarFunction::Array => "make_array",
BuiltinScalarFunction::Ascii => "ascii",
BuiltinScalarFunction::BitLength => "bit_length",
BuiltinScalarFunction::Btrim => "btrim",
@@ -723,6 +740,17 @@ mod utils {
BuiltinScalarFunction::Trim => "trim",
BuiltinScalarFunction::Upper => "upper",
BuiltinScalarFunction::RegexpMatch => "regexp_match",
BuiltinScalarFunction::Atan2 => "atan2",
BuiltinScalarFunction::Coalesce => "coalesce",
BuiltinScalarFunction::Power => "power",
BuiltinScalarFunction::MakeArray => "make_array",
BuiltinScalarFunction::DateBin => "date_bin",
BuiltinScalarFunction::FromUnixtime => "from_unixtime",
BuiltinScalarFunction::CurrentDate => "current_date",
BuiltinScalarFunction::CurrentTime => "current_time",
BuiltinScalarFunction::Uuid => "uuid",
BuiltinScalarFunction::Struct => "struct",
BuiltinScalarFunction::ArrowTypeof => "arrow_type_of",
}
}
}

View File

@@ -19,10 +19,10 @@ use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::datasource::TableProvider;
use datafusion::logical_plan::plan::Filter;
use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema};
use datafusion::common::ToDFSchema;
use datafusion::datasource::DefaultTableSource;
use datafusion::physical_plan::project_schema;
use datafusion_expr::{Filter, LogicalPlan, TableScan, TableSource};
use prost::Message;
use snafu::{ensure, OptionExt, ResultExt};
use substrait_proto::protobuf::expression::mask_expression::{StructItem, StructSelect};
@@ -144,7 +144,7 @@ impl DFLogicalSubstraitConvertor {
.context(error::ConvertDfSchemaSnafu)?;
let predicate = to_df_expr(ctx, *condition, &schema)?;
LogicalPlan::Filter(Filter { predicate, input })
LogicalPlan::Filter(Filter::try_new(predicate, input).context(DFInternalSnafu)?)
}
RelType::Fetch(_fetch_rel) => UnsupportedPlanSnafu {
name: "Fetch Relation",
@@ -238,7 +238,9 @@ impl DFLogicalSubstraitConvertor {
.context(TableNotFoundSnafu {
name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
})?;
let adapter = Arc::new(DfTableProviderAdapter::new(table_ref));
let adapter = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table_ref),
)));
// Get schema directly from the table, and compare it with the schema retrieved from substrait proto.
let stored_schema = adapter.schema();
@@ -267,14 +269,14 @@ impl DFLogicalSubstraitConvertor {
ctx.set_df_schema(projected_schema.clone());
// TODO(ruihang): Support limit
// TODO(ruihang): Support limit(fetch)
Ok(LogicalPlan::TableScan(TableScan {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
source: adapter,
projection,
projected_schema,
filters,
limit: None,
fetch: None,
}))
}
@@ -302,7 +304,7 @@ impl DFLogicalSubstraitConvertor {
.fail()?,
LogicalPlan::Filter(filter) => {
let input = Some(Box::new(
self.logical_plan_to_rel(ctx, filter.input.clone())?,
self.logical_plan_to_rel(ctx, filter.input().clone())?,
));
let schema = plan
@@ -312,7 +314,7 @@ impl DFLogicalSubstraitConvertor {
.context(error::ConvertDfSchemaSnafu)?;
let condition = Some(Box::new(expression_from_df_expr(
ctx,
&filter.predicate,
filter.predicate(),
&schema,
)?));
@@ -368,7 +370,16 @@ impl DFLogicalSubstraitConvertor {
name: "DataFusion Logical Limit",
}
.fail()?,
LogicalPlan::CreateExternalTable(_)
LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::CreateView(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropView(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Values(_)
@@ -485,7 +496,9 @@ impl DFLogicalSubstraitConvertor {
fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> bool {
lhs.fields.len() == rhs.fields.len()
&& lhs.fields.iter().zip(rhs.fields.iter()).all(|(x, y)| {
x.name == y.name && x.data_type == y.data_type && x.is_nullable == y.is_nullable
x.name() == y.name()
&& x.data_type() == y.data_type()
&& x.is_nullable() == y.is_nullable()
})
}
@@ -494,7 +507,7 @@ mod test {
use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datafusion::logical_plan::DFSchema;
use datafusion::common::{DFSchema, ToDFSchema};
use datatypes::schema::Schema;
use table::requests::CreateTableRequest;
use table::test_util::{EmptyTable, MockTableEngine};
@@ -564,7 +577,9 @@ mod test {
})
.await
.unwrap();
let adapter = Arc::new(DfTableProviderAdapter::new(table_ref));
let adapter = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table_ref),
)));
let projection = vec![1, 3, 5];
let df_schema = adapter.schema().to_dfschema().unwrap();
@@ -584,7 +599,7 @@ mod test {
projection: Some(projection),
projected_schema,
filters: vec![],
limit: None,
fetch: None,
});
logical_plan_round_trip(table_scan_plan, catalog_manager).await;

View File

@@ -524,7 +524,9 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder};
use datatypes::value::Value;
use datatypes::vectors::TimestampMillisecondVector;
use datatypes::vectors::{
Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef,
};
use log_store::fs::noop::NoopLogStore;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;

View File

@@ -21,12 +21,12 @@ pub mod insert;
pub mod query;
pub mod show;
pub mod statement;
use std::str::FromStr;
use api::helper::ColumnDataTypeWrapper;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::DateTimeType;
@@ -79,7 +79,7 @@ fn parse_string_to_value(
data_type: &ConcreteDataType,
) -> Result<Value> {
ensure!(
data_type.stringifiable(),
data_type.is_stringifiable(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),
@@ -112,8 +112,8 @@ fn parse_string_to_value(
ConcreteDataType::Timestamp(t) => {
if let Ok(ts) = Timestamp::from_str(&s) {
Ok(Value::Timestamp(Timestamp::new(
ts.convert_to(t.unit),
t.unit,
ts.convert_to(t.unit()),
t.unit(),
)))
} else {
ParseSqlValueSnafu {
@@ -301,7 +301,10 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<Co
SqlDataType::Date => Ok(ConcreteDataType::date_datatype()),
SqlDataType::Custom(obj_name) => match &obj_name.0[..] {
[type_name] => {
if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) {
if type_name
.value
.eq_ignore_ascii_case(DateTimeType::default().name())
{
Ok(ConcreteDataType::datetime_datatype())
} else {
error::SqlTypeNotSupportedSnafu {

View File

@@ -98,11 +98,26 @@ pub fn values_to_string(data_type: ColumnDataType, values: Values) -> Vec<String
.into_iter()
.map(|v| v.to_string())
.collect(),
ColumnDataType::Timestamp => values
ColumnDataType::TimestampSecond => values
.ts_second_values
.into_iter()
.map(|v| v.to_string())
.collect(),
ColumnDataType::TimestampMillisecond => values
.ts_millisecond_values
.into_iter()
.map(|v| v.to_string())
.collect(),
ColumnDataType::TimestampMicrosecond => values
.ts_microsecond_values
.into_iter()
.map(|v| v.to_string())
.collect(),
ColumnDataType::TimestampNanosecond => values
.ts_nanosecond_values
.into_iter()
.map(|v| v.to_string())
.collect(),
}
}