diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 0ca1f33182..f39b48c87e 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -20,7 +20,6 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; -use std::sync::Arc; use std::time::Instant; use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArray}; @@ -32,9 +31,7 @@ use client::api::v1::column::Values; use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr}; use client::{Client, Database, Select}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; -use parquet::file::reader::FileReader; -use parquet::file::serialized_reader::SerializedFileReader; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use tokio::task::JoinSet; const DATABASE_NAME: &str = "greptime"; @@ -86,10 +83,14 @@ async fn write_data( pb_style: ProgressStyle, ) -> u128 { let file = std::fs::File::open(&path).unwrap(); - let file_reader = Arc::new(SerializedFileReader::new(file).unwrap()); - let row_num = file_reader.metadata().file_metadata().num_rows(); - let record_batch_reader = ParquetFileArrowReader::new(file_reader) - .get_record_reader(batch_size) + let record_batch_reader_builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + let row_num = record_batch_reader_builder + .metadata() + .file_metadata() + .num_rows(); + let record_batch_reader = record_batch_reader_builder + .with_batch_size(batch_size) + .build() .unwrap(); let progress_bar = mpb.add(ProgressBar::new(row_num as _)); progress_bar.set_style(pb_style); @@ -210,9 +211,10 @@ fn build_values(column: &ArrayRef) -> Values { | DataType::FixedSizeList(_, _) | DataType::LargeList(_) | DataType::Struct(_) - | DataType::Union(_, _) + | DataType::Union(_, _, _) | DataType::Dictionary(_, _) - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) | DataType::Map(_, _) => todo!(), } } diff --git a/src/common/substrait/src/context.rs b/src/common/substrait/src/context.rs index b017e9cc9a..af4a07b788 100644 --- a/src/common/substrait/src/context.rs +++ b/src/common/substrait/src/context.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use datafusion::logical_plan::DFSchemaRef; +use datafusion::common::DFSchemaRef; use substrait_proto::protobuf::extensions::simple_extension_declaration::{ ExtensionFunction, MappingType, }; diff --git a/src/common/substrait/src/df_expr.rs b/src/common/substrait/src/df_expr.rs index d924e7b085..b8d77a113c 100644 --- a/src/common/substrait/src/df_expr.rs +++ b/src/common/substrait/src/df_expr.rs @@ -15,8 +15,8 @@ use std::collections::VecDeque; use std::str::FromStr; -use datafusion::logical_plan::{Column, Expr}; -use datafusion_expr::{expr_fn, lit, BuiltinScalarFunction, Operator}; +use datafusion::common::Column; +use datafusion_expr::{expr_fn, lit, Between, BinaryExpr, BuiltinScalarFunction, Expr, Operator}; use datatypes::schema::Schema; use snafu::{ensure, OptionExt}; use substrait_proto::protobuf::expression::field_reference::ReferenceType as FieldReferenceType; @@ -311,21 +311,21 @@ pub fn convert_scalar_function( // skip GetIndexedField, unimplemented. "between" => { ensure_arg_len(3)?; - Expr::Between { + Expr::Between(Between { expr: Box::new(inputs.pop_front().unwrap()), negated: false, low: Box::new(inputs.pop_front().unwrap()), high: Box::new(inputs.pop_front().unwrap()), - } + }) } "not_between" => { ensure_arg_len(3)?; - Expr::Between { + Expr::Between(Between { expr: Box::new(inputs.pop_front().unwrap()), negated: true, low: Box::new(inputs.pop_front().unwrap()), high: Box::new(inputs.pop_front().unwrap()), - } + }) } // skip Case, is covered in substrait::SwitchExpression. // skip Cast and TryCast, is covered in substrait::Cast. @@ -477,7 +477,7 @@ pub fn expression_from_df_expr( rex_type: Some(RexType::Literal(l)), } } - Expr::BinaryExpr { left, op, right } => { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { let left = expression_from_df_expr(ctx, left, schema)?; let right = expression_from_df_expr(ctx, right, schema)?; let arguments = utils::expression_to_argument(vec![left, right]); @@ -518,12 +518,12 @@ pub fn expression_from_df_expr( name: expr.to_string(), } .fail()?, - Expr::Between { + Expr::Between(Between { expr, negated, low, high, - } => { + }) => { let expr = expression_from_df_expr(ctx, expr, schema)?; let low = expression_from_df_expr(ctx, low, schema)?; let high = expression_from_df_expr(ctx, high, schema)?; @@ -564,7 +564,21 @@ pub fn expression_from_df_expr( | Expr::WindowFunction { .. } | Expr::AggregateUDF { .. } | Expr::InList { .. } - | Expr::Wildcard => UnsupportedExprSnafu { + | Expr::Wildcard + | Expr::Like(_) + | Expr::ILike(_) + | Expr::SimilarTo(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsUnknown(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::IsNotUnknown(_) + | Expr::Exists { .. } + | Expr::InSubquery { .. } + | Expr::ScalarSubquery(..) + | Expr::QualifiedWildcard { .. } => todo!(), + Expr::GroupingSet(_) => UnsupportedExprSnafu { name: expr.to_string(), } .fail()?, @@ -628,6 +642,10 @@ mod utils { Operator::RegexNotIMatch => "regex_not_i_match", Operator::BitwiseAnd => "bitwise_and", Operator::BitwiseOr => "bitwise_or", + Operator::BitwiseXor => "bitwise_xor", + Operator::BitwiseShiftRight => "bitwise_shift_right", + Operator::BitwiseShiftLeft => "bitwise_shift_left", + Operator::StringConcat => "string_concat", } } @@ -679,7 +697,6 @@ mod utils { BuiltinScalarFunction::Sqrt => "sqrt", BuiltinScalarFunction::Tan => "tan", BuiltinScalarFunction::Trunc => "trunc", - BuiltinScalarFunction::Array => "make_array", BuiltinScalarFunction::Ascii => "ascii", BuiltinScalarFunction::BitLength => "bit_length", BuiltinScalarFunction::Btrim => "btrim", @@ -723,6 +740,17 @@ mod utils { BuiltinScalarFunction::Trim => "trim", BuiltinScalarFunction::Upper => "upper", BuiltinScalarFunction::RegexpMatch => "regexp_match", + BuiltinScalarFunction::Atan2 => "atan2", + BuiltinScalarFunction::Coalesce => "coalesce", + BuiltinScalarFunction::Power => "power", + BuiltinScalarFunction::MakeArray => "make_array", + BuiltinScalarFunction::DateBin => "date_bin", + BuiltinScalarFunction::FromUnixtime => "from_unixtime", + BuiltinScalarFunction::CurrentDate => "current_date", + BuiltinScalarFunction::CurrentTime => "current_time", + BuiltinScalarFunction::Uuid => "uuid", + BuiltinScalarFunction::Struct => "struct", + BuiltinScalarFunction::ArrowTypeof => "arrow_type_of", } } } diff --git a/src/common/substrait/src/df_logical.rs b/src/common/substrait/src/df_logical.rs index 81909cf38d..43524fe045 100644 --- a/src/common/substrait/src/df_logical.rs +++ b/src/common/substrait/src/df_logical.rs @@ -19,10 +19,10 @@ use catalog::CatalogManagerRef; use common_error::prelude::BoxedError; use common_telemetry::debug; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datafusion::datasource::TableProvider; -use datafusion::logical_plan::plan::Filter; -use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema}; +use datafusion::common::ToDFSchema; +use datafusion::datasource::DefaultTableSource; use datafusion::physical_plan::project_schema; +use datafusion_expr::{Filter, LogicalPlan, TableScan, TableSource}; use prost::Message; use snafu::{ensure, OptionExt, ResultExt}; use substrait_proto::protobuf::expression::mask_expression::{StructItem, StructSelect}; @@ -144,7 +144,7 @@ impl DFLogicalSubstraitConvertor { .context(error::ConvertDfSchemaSnafu)?; let predicate = to_df_expr(ctx, *condition, &schema)?; - LogicalPlan::Filter(Filter { predicate, input }) + LogicalPlan::Filter(Filter::try_new(predicate, input).context(DFInternalSnafu)?) } RelType::Fetch(_fetch_rel) => UnsupportedPlanSnafu { name: "Fetch Relation", @@ -238,7 +238,9 @@ impl DFLogicalSubstraitConvertor { .context(TableNotFoundSnafu { name: format!("{}.{}.{}", catalog_name, schema_name, table_name), })?; - let adapter = Arc::new(DfTableProviderAdapter::new(table_ref)); + let adapter = Arc::new(DefaultTableSource::new(Arc::new( + DfTableProviderAdapter::new(table_ref), + ))); // Get schema directly from the table, and compare it with the schema retrieved from substrait proto. let stored_schema = adapter.schema(); @@ -267,14 +269,14 @@ impl DFLogicalSubstraitConvertor { ctx.set_df_schema(projected_schema.clone()); - // TODO(ruihang): Support limit + // TODO(ruihang): Support limit(fetch) Ok(LogicalPlan::TableScan(TableScan { table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name), source: adapter, projection, projected_schema, filters, - limit: None, + fetch: None, })) } @@ -302,7 +304,7 @@ impl DFLogicalSubstraitConvertor { .fail()?, LogicalPlan::Filter(filter) => { let input = Some(Box::new( - self.logical_plan_to_rel(ctx, filter.input.clone())?, + self.logical_plan_to_rel(ctx, filter.input().clone())?, )); let schema = plan @@ -312,7 +314,7 @@ impl DFLogicalSubstraitConvertor { .context(error::ConvertDfSchemaSnafu)?; let condition = Some(Box::new(expression_from_df_expr( ctx, - &filter.predicate, + filter.predicate(), &schema, )?)); @@ -368,7 +370,16 @@ impl DFLogicalSubstraitConvertor { name: "DataFusion Logical Limit", } .fail()?, - LogicalPlan::CreateExternalTable(_) + + LogicalPlan::Subquery(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::CreateView(_) + | LogicalPlan::CreateCatalogSchema(_) + | LogicalPlan::CreateCatalog(_) + | LogicalPlan::DropView(_) + | LogicalPlan::Distinct(_) + | LogicalPlan::SetVariable(_) + | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable(_) | LogicalPlan::Values(_) @@ -485,7 +496,9 @@ impl DFLogicalSubstraitConvertor { fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> bool { lhs.fields.len() == rhs.fields.len() && lhs.fields.iter().zip(rhs.fields.iter()).all(|(x, y)| { - x.name == y.name && x.data_type == y.data_type && x.is_nullable == y.is_nullable + x.name() == y.name() + && x.data_type() == y.data_type() + && x.is_nullable() == y.is_nullable() }) } @@ -494,7 +507,7 @@ mod test { use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider}; use catalog::{CatalogList, CatalogProvider, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use datafusion::logical_plan::DFSchema; + use datafusion::common::{DFSchema, ToDFSchema}; use datatypes::schema::Schema; use table::requests::CreateTableRequest; use table::test_util::{EmptyTable, MockTableEngine}; @@ -564,7 +577,9 @@ mod test { }) .await .unwrap(); - let adapter = Arc::new(DfTableProviderAdapter::new(table_ref)); + let adapter = Arc::new(DefaultTableSource::new(Arc::new( + DfTableProviderAdapter::new(table_ref), + ))); let projection = vec![1, 3, 5]; let df_schema = adapter.schema().to_dfschema().unwrap(); @@ -584,7 +599,7 @@ mod test { projection: Some(projection), projected_schema, filters: vec![], - limit: None, + fetch: None, }); logical_plan_round_trip(table_scan_plan, catalog_manager).await; diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 264eab75a7..92709b1b49 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -524,7 +524,9 @@ mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder}; use datatypes::value::Value; - use datatypes::vectors::TimestampMillisecondVector; + use datatypes::vectors::{ + Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef, + }; use log_store::fs::noop::NoopLogStore; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 183bafb671..b88f9380fa 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -21,12 +21,12 @@ pub mod insert; pub mod query; pub mod show; pub mod statement; - use std::str::FromStr; use api::helper::ColumnDataTypeWrapper; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_time::Timestamp; +use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use datatypes::types::DateTimeType; @@ -79,7 +79,7 @@ fn parse_string_to_value( data_type: &ConcreteDataType, ) -> Result { ensure!( - data_type.stringifiable(), + data_type.is_stringifiable(), ColumnTypeMismatchSnafu { column_name, expect: data_type.clone(), @@ -112,8 +112,8 @@ fn parse_string_to_value( ConcreteDataType::Timestamp(t) => { if let Ok(ts) = Timestamp::from_str(&s) { Ok(Value::Timestamp(Timestamp::new( - ts.convert_to(t.unit), - t.unit, + ts.convert_to(t.unit()), + t.unit(), ))) } else { ParseSqlValueSnafu { @@ -301,7 +301,10 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result Ok(ConcreteDataType::date_datatype()), SqlDataType::Custom(obj_name) => match &obj_name.0[..] { [type_name] => { - if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) { + if type_name + .value + .eq_ignore_ascii_case(DateTimeType::default().name()) + { Ok(ConcreteDataType::datetime_datatype()) } else { error::SqlTypeNotSupportedSnafu { diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index 85acbfd671..6c42d4391d 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -98,11 +98,26 @@ pub fn values_to_string(data_type: ColumnDataType, values: Values) -> Vec values + ColumnDataType::TimestampSecond => values + .ts_second_values + .into_iter() + .map(|v| v.to_string()) + .collect(), + ColumnDataType::TimestampMillisecond => values .ts_millisecond_values .into_iter() .map(|v| v.to_string()) .collect(), + ColumnDataType::TimestampMicrosecond => values + .ts_microsecond_values + .into_iter() + .map(|v| v.to_string()) + .collect(), + ColumnDataType::TimestampNanosecond => values + .ts_nanosecond_values + .into_iter() + .map(|v| v.to_string()) + .collect(), } }