chore: bump datafusion version to fix last_value regression (#4169)

* chore: bump datafusion version to fix `last_value` regression

* fix: resolve PR comments

* fix ci
This commit is contained in:
LFC
2024-06-19 15:47:17 +08:00
committed by GitHub
parent 22d12683b4
commit cc2f7efb98
15 changed files with 611 additions and 263 deletions

688
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -104,15 +104,15 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "08e19f4956d32164be6fc66eb5a4c080eb0023d1" }
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "729b356ef543ffcda6813c7b5373507a04ae0109" }
derive_builder = "0.12"
dotenv = "0.15"
# TODO(LFC): Wait for https://github.com/etcdv3/etcd-client/pull/76

View File

@@ -19,7 +19,6 @@ use std::vec;
use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -167,8 +166,9 @@ async fn test_parquet_exec() {
.to_string();
let base_config = scan_config(schema.clone(), None, path);
let exec = ParquetExec::new(base_config, None, None, TableParquetOptions::default())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));
let exec = ParquetExec::builder(base_config)
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)))
.build();
let ctx = SessionContext::new();

View File

@@ -882,7 +882,8 @@ impl TryFrom<ScalarValue> for Value {
| ScalarValue::FixedSizeList(_)
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Union(_, _, _) => {
| ScalarValue::Union(_, _, _)
| ScalarValue::Float16(_) => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: v.data_type(),
}

View File

@@ -239,7 +239,8 @@ impl Helper {
| ScalarValue::FixedSizeList(_)
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Union(_, _, _) => {
| ScalarValue::Union(_, _, _)
| ScalarValue::Float16(_) => {
return error::ConversionSnafu {
from: format!("Unsupported scalar value: {value}"),
}

View File

@@ -22,7 +22,6 @@ use common_datasource::file_format::Format;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::{Statistics, ToDFSchema};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
@@ -198,10 +197,15 @@ fn new_parquet_stream_with_exec_plan(
// TODO(ruihang): get this from upper layer
let task_ctx = SessionContext::default().task_ctx();
let parquet_exec = ParquetExec::new(scan_config, filters, None, TableParquetOptions::default())
let mut builder = ParquetExec::builder(scan_config);
if let Some(filters) = filters {
builder = builder.with_predicate(filters);
}
let parquet_exec = builder
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
store.clone(),
)));
)))
.build();
let stream = parquet_exec
.execute(0, task_ctx)
.context(error::ParquetScanPlanSnafu)?;

View File

@@ -24,9 +24,10 @@ use hydroflow::futures::future::Map;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use substrait::variation_const::{
DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF,
TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF,
UNSIGNED_INTEGER_TYPE_REF,
DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF,
TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
};
use substrait_proto::proto::aggregate_function::AggregationInvocation;
use substrait_proto::proto::aggregate_rel::{Grouping, Measure};

View File

@@ -17,9 +17,10 @@ use common_time::{Date, Timestamp};
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::value::Value;
use substrait::variation_const::{
DATE_32_TYPE_REF, DATE_64_TYPE_REF, DEFAULT_TYPE_REF, TIMESTAMP_MICRO_TYPE_REF,
TIMESTAMP_MILLI_TYPE_REF, TIMESTAMP_NANO_TYPE_REF, TIMESTAMP_SECOND_TYPE_REF,
UNSIGNED_INTEGER_TYPE_REF,
DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF,
TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
};
use substrait_proto::proto::expression::literal::LiteralType;
use substrait_proto::proto::expression::Literal;
@@ -33,41 +34,41 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro
let scalar_value = match &lit.literal_type {
Some(LiteralType::Boolean(b)) => (Value::from(*b), CDT::boolean_datatype()),
Some(LiteralType::I8(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => (Value::from(*n as i8), CDT::int8_datatype()),
UNSIGNED_INTEGER_TYPE_REF => (Value::from(*n as u8), CDT::uint8_datatype()),
DEFAULT_TYPE_VARIATION_REF => (Value::from(*n as i8), CDT::int8_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => (Value::from(*n as u8), CDT::uint8_datatype()),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::I16(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => (Value::from(*n as i16), CDT::int16_datatype()),
UNSIGNED_INTEGER_TYPE_REF => (Value::from(*n as u16), CDT::uint16_datatype()),
DEFAULT_TYPE_VARIATION_REF => (Value::from(*n as i16), CDT::int16_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => (Value::from(*n as u16), CDT::uint16_datatype()),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::I32(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => (Value::from(*n), CDT::int32_datatype()),
UNSIGNED_INTEGER_TYPE_REF => (Value::from(*n as u32), CDT::uint32_datatype()),
DEFAULT_TYPE_VARIATION_REF => (Value::from(*n), CDT::int32_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => (Value::from(*n as u32), CDT::uint32_datatype()),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::I64(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => (Value::from(*n), CDT::int64_datatype()),
UNSIGNED_INTEGER_TYPE_REF => (Value::from(*n as u64), CDT::uint64_datatype()),
DEFAULT_TYPE_VARIATION_REF => (Value::from(*n), CDT::int64_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => (Value::from(*n as u64), CDT::uint64_datatype()),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::Fp32(f)) => (Value::from(*f), CDT::float32_datatype()),
Some(LiteralType::Fp64(f)) => (Value::from(*f), CDT::float64_datatype()),
Some(LiteralType::Timestamp(t)) => match lit.type_variation_reference {
TIMESTAMP_SECOND_TYPE_REF => (
TIMESTAMP_SECOND_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_second(*t)),
CDT::timestamp_second_datatype(),
),
TIMESTAMP_MILLI_TYPE_REF => (
TIMESTAMP_MILLI_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_millisecond(*t)),
CDT::timestamp_millisecond_datatype(),
),
TIMESTAMP_MICRO_TYPE_REF => (
TIMESTAMP_MICRO_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_microsecond(*t)),
CDT::timestamp_microsecond_datatype(),
),
TIMESTAMP_NANO_TYPE_REF => (
TIMESTAMP_NANO_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_nanosecond(*t)),
CDT::timestamp_nanosecond_datatype(),
),
@@ -115,37 +116,36 @@ pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result<C
match kind {
Kind::Bool(_) => Ok(CDT::boolean_datatype()),
Kind::I8(integer) => match integer.type_variation_reference {
DEFAULT_TYPE_REF => Ok(CDT::int8_datatype()),
UNSIGNED_INTEGER_TYPE_REF => Ok(CDT::uint8_datatype()),
DEFAULT_TYPE_VARIATION_REF => Ok(CDT::int8_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => Ok(CDT::uint8_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::I16(integer) => match integer.type_variation_reference {
DEFAULT_TYPE_REF => Ok(CDT::int16_datatype()),
UNSIGNED_INTEGER_TYPE_REF => Ok(CDT::uint16_datatype()),
DEFAULT_TYPE_VARIATION_REF => Ok(CDT::int16_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => Ok(CDT::uint16_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::I32(integer) => match integer.type_variation_reference {
DEFAULT_TYPE_REF => Ok(CDT::int32_datatype()),
UNSIGNED_INTEGER_TYPE_REF => Ok(CDT::uint32_datatype()),
DEFAULT_TYPE_VARIATION_REF => Ok(CDT::int32_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => Ok(CDT::uint32_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::I64(integer) => match integer.type_variation_reference {
DEFAULT_TYPE_REF => Ok(CDT::int64_datatype()),
UNSIGNED_INTEGER_TYPE_REF => Ok(CDT::uint64_datatype()),
DEFAULT_TYPE_VARIATION_REF => Ok(CDT::int64_datatype()),
UNSIGNED_INTEGER_TYPE_VARIATION_REF => Ok(CDT::uint64_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::Fp32(_) => Ok(CDT::float32_datatype()),
Kind::Fp64(_) => Ok(CDT::float64_datatype()),
Kind::Timestamp(ts) => match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_REF => Ok(CDT::timestamp_second_datatype()),
TIMESTAMP_MILLI_TYPE_REF => Ok(CDT::timestamp_millisecond_datatype()),
TIMESTAMP_MICRO_TYPE_REF => Ok(CDT::timestamp_microsecond_datatype()),
TIMESTAMP_NANO_TYPE_REF => Ok(CDT::timestamp_nanosecond_datatype()),
TIMESTAMP_SECOND_TYPE_VARIATION_REF => Ok(CDT::timestamp_second_datatype()),
TIMESTAMP_MILLI_TYPE_VARIATION_REF => Ok(CDT::timestamp_millisecond_datatype()),
TIMESTAMP_MICRO_TYPE_VARIATION_REF => Ok(CDT::timestamp_microsecond_datatype()),
TIMESTAMP_NANO_TYPE_VARIATION_REF => Ok(CDT::timestamp_nanosecond_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::Date(date) => match date.type_variation_reference {
DATE_32_TYPE_REF => Ok(CDT::date_datatype()),
DATE_64_TYPE_REF => Ok(CDT::date_datatype()),
DATE_32_TYPE_VARIATION_REF | DATE_64_TYPE_VARIATION_REF => Ok(CDT::date_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::Binary(_) => Ok(CDT::binary_datatype()),

View File

@@ -22,6 +22,7 @@ use common_query::prelude::GREPTIME_VALUE;
use datafusion::common::{DFSchemaRef, Result as DfResult};
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::functions_aggregate::sum;
use datafusion::logical_expr::expr::{
AggregateFunction, AggregateFunctionDefinition, Alias, ScalarFunction,
};
@@ -1379,14 +1380,18 @@ impl PromPlanner {
input_plan: &LogicalPlan,
) -> Result<Vec<DfExpr>> {
let aggr = match op.id() {
token::T_SUM => AggregateFunctionEnum::Sum,
token::T_AVG => AggregateFunctionEnum::Avg,
token::T_COUNT => AggregateFunctionEnum::Count,
token::T_MIN => AggregateFunctionEnum::Min,
token::T_MAX => AggregateFunctionEnum::Max,
token::T_GROUP => AggregateFunctionEnum::Grouping,
token::T_STDDEV => AggregateFunctionEnum::StddevPop,
token::T_STDVAR => AggregateFunctionEnum::VariancePop,
token::T_SUM => AggregateFunctionDefinition::UDF(sum::sum_udaf()),
token::T_AVG => AggregateFunctionDefinition::BuiltIn(AggregateFunctionEnum::Avg),
token::T_COUNT => AggregateFunctionDefinition::BuiltIn(AggregateFunctionEnum::Count),
token::T_MIN => AggregateFunctionDefinition::BuiltIn(AggregateFunctionEnum::Min),
token::T_MAX => AggregateFunctionDefinition::BuiltIn(AggregateFunctionEnum::Max),
token::T_GROUP => AggregateFunctionDefinition::BuiltIn(AggregateFunctionEnum::Grouping),
token::T_STDDEV => {
AggregateFunctionDefinition::BuiltIn(AggregateFunctionEnum::StddevPop)
}
token::T_STDVAR => {
AggregateFunctionDefinition::BuiltIn(AggregateFunctionEnum::VariancePop)
}
token::T_TOPK | token::T_BOTTOMK | token::T_COUNT_VALUES | token::T_QUANTILE => {
UnsupportedExprSnafu {
name: format!("{op:?}"),
@@ -1403,7 +1408,7 @@ impl PromPlanner {
.iter()
.map(|col| {
DfExpr::AggregateFunction(AggregateFunction {
func_def: AggregateFunctionDefinition::BuiltIn(aggr.clone()),
func_def: aggr.clone(),
args: vec![DfExpr::Column(Column::from_name(col))],
distinct: false,
filter: None,

View File

@@ -64,6 +64,38 @@ SELECT ISNULL('string');
| false |
+------------------------+
SELECT FIRST_VALUE(1);
+-----------------------+
| first_value(Int64(1)) |
+-----------------------+
| 1 |
+-----------------------+
SELECT FIRST_VALUE('a');
+------------------------+
| first_value(Utf8("a")) |
+------------------------+
| a |
+------------------------+
SELECT LAST_VALUE(1);
+----------------------+
| last_value(Int64(1)) |
+----------------------+
| 1 |
+----------------------+
SELECT LAST_VALUE('a');
+-----------------------+
| last_value(Utf8("a")) |
+-----------------------+
| a |
+-----------------------+
DROP TABLE t;
Affected Rows: 0

View File

@@ -16,4 +16,12 @@ SELECT ISNULL(true);
SELECT ISNULL('string');
SELECT FIRST_VALUE(1);
SELECT FIRST_VALUE('a');
SELECT LAST_VALUE(1);
SELECT LAST_VALUE('a');
DROP TABLE t;

View File

@@ -231,7 +231,11 @@ SELECT ts, host, count(distinct *) RANGE '5s' FROM host ALIGN '5s' ORDER BY host
-- Test error first_value/last_value
SELECT ts, host, first_value(val, val) RANGE '5s' FROM host ALIGN '5s' ORDER BY host, ts;
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Coercion from [Int64, Int64] to the signature OneOf([ArraySignature(Array), Uniform(1, [Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64])]) failed.
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Error during planning: Coercion from [Int64, Int64] to the signature OneOf([ArraySignature(Array), Numeric(1), Uniform(1, [Utf8])]) failed. and No function matches the given name and argument types 'first_value(Int64, Int64)'. You might need to add explicit type casts.
Candidate functions:
first_value(array)
first_value(Numeric(1))
first_value(Utf8)
DROP TABLE host;

View File

@@ -5,7 +5,7 @@ SELECT arrow_typeof(FIRST_VALUE('0.1'::DECIMAL(4,1)));
+----------------------------------------+
| arrow_typeof(first_value(Utf8("0.1"))) |
+----------------------------------------+
| Float64 |
| Decimal128(4, 1) |
+----------------------------------------+
-- first_value
@@ -18,7 +18,7 @@ SELECT FIRST_VALUE(NULL::DECIMAL),
+-------------------+--------------------------+--------------------------------+------------------------------------------+--------------------------------------------------------------+
| first_value(NULL) | first_value(Utf8("0.1")) | first_value(Utf8("4938245.1")) | first_value(Utf8("45672564564938245.1")) | first_value(Utf8("4567645908450368043562342564564938245.1")) |
+-------------------+--------------------------+--------------------------------+------------------------------------------+--------------------------------------------------------------+
| | 0.1 | 4938245.1 | 4.567256456493825e16 | 4.567645908450368e36 |
| | 0.1 | 4938245.1 | 45672564564938245.1 | 4567645908450368043562342564564938245.1 |
+-------------------+--------------------------+--------------------------------+------------------------------------------+--------------------------------------------------------------+
-- min

View File

@@ -277,9 +277,9 @@ This was likely caused by a bug in DataFusion's code and we would welcome that y
SELECT SUM(interval_value) from intervals;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: No function matches the given name and argument types 'SUM(Interval(MonthDayNano))'. You might need to add explicit type casts.
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: User-defined coercion failed with Execution("Sum not supported for Interval(MonthDayNano)") and No function matches the given name and argument types 'SUM(Interval(MonthDayNano))'. You might need to add explicit type casts.
Candidate functions:
SUM(Int8/Int16/Int32/Int64/UInt8/UInt16/UInt32/UInt64/Float32/Float64)
SUM(UserDefined)
SELECT AVG(interval_value) from intervals;

View File

@@ -75,9 +75,9 @@ SELECT MAX(t) FROM timestamp;
SELECT SUM(t) FROM timestamp;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: No function matches the given name and argument types 'SUM(Timestamp(Millisecond, None))'. You might need to add explicit type casts.
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: User-defined coercion failed with Execution("Sum not supported for Timestamp(Millisecond, None)") and No function matches the given name and argument types 'SUM(Timestamp(Millisecond, None))'. You might need to add explicit type casts.
Candidate functions:
SUM(Int8/Int16/Int32/Int64/UInt8/UInt16/UInt32/UInt64/Float32/Float64)
SUM(UserDefined)
SELECT AVG(t) FROM timestamp;