chore: update datafusion family (#6675)

* chore: update datafusion family

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

* use official otel-arrow-rust

Signed-off-by: luofucong <luofc@foxmail.com>

* rebase

Signed-off-by: luofucong <luofc@foxmail.com>

* use the official orc-rust

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* remove the empty lines

Signed-off-by: luofucong <luofc@foxmail.com>

* try following PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-08-15 20:41:49 +08:00
committed by GitHub
parent dfc29eb3b3
commit f9d2a89a0c
198 changed files with 4932 additions and 4196 deletions

View File

@@ -25,19 +25,17 @@ common-error.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
datafusion.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
object_store_opendal.workspace = true
orc-rust = { git = "https://github.com/datafusion-contrib/orc-rust", rev = "3134cab581a8e91b942d6a23aca2916ea965f6bb", default-features = false, features = [
"async",
] }
orc-rust = { version = "0.6.3", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true
rand.workspace = true
regex = "1.7"
serde.workspace = true
snafu.workspace = true
@@ -47,6 +45,4 @@ tokio-util.workspace = true
url = "2.3"
[dev-dependencies]
common-telemetry.workspace = true
common-test-util.workspace = true
uuid.workspace = true

View File

@@ -12,16 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_schema::{ArrowError, Schema, SchemaRef};
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DfResult};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::FutureExt;
use object_store::ObjectStore;
use orc_rust::arrow_reader::ArrowReaderBuilder;
use orc_rust::async_arrow_reader::ArrowStreamReader;
@@ -97,67 +92,6 @@ impl FileFormat for OrcFormat {
}
}
#[derive(Debug, Clone)]
pub struct OrcOpener {
object_store: Arc<ObjectStore>,
output_schema: SchemaRef,
projection: Option<Vec<usize>>,
}
impl OrcOpener {
pub fn new(
object_store: ObjectStore,
output_schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Self {
Self {
object_store: Arc::from(object_store),
output_schema,
projection,
}
}
}
impl FileOpener for OrcOpener {
fn open(&self, meta: FileMeta) -> DfResult<FileOpenFuture> {
let object_store = self.object_store.clone();
let projected_schema = if let Some(projection) = &self.projection {
let projected_schema = self
.output_schema
.project(projection)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Arc::new(projected_schema)
} else {
self.output_schema.clone()
};
let projection = self.projection.clone();
Ok(Box::pin(async move {
let path = meta.location().to_string();
let meta = object_store
.stat(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let reader = object_store
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream_reader =
new_orc_stream_reader(ReaderAdapter::new(reader, meta.content_length()))
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream =
RecordBatchStreamTypeAdapter::new(projected_schema, stream_reader, projection);
let adopted = stream.map_err(|e| ArrowError::ExternalError(Box::new(e)));
Ok(adopted.boxed())
}))
}
}
#[cfg(test)]
mod tests {
use common_test_util::find_workspace_path;

View File

@@ -31,6 +31,7 @@ use datatypes::schema::SchemaRef;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{FuturesAsyncReader, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
@@ -65,7 +66,7 @@ impl FileFormat for ParquetFormat {
.compat();
let metadata = reader
.get_metadata()
.get_metadata(None)
.await
.context(error::ReadParquetSnafuSnafu)?;
@@ -146,7 +147,7 @@ impl LazyParquetFileReader {
impl AsyncFileReader for LazyParquetFileReader {
fn get_bytes(
&mut self,
range: std::ops::Range<usize>,
range: std::ops::Range<u64>,
) -> BoxFuture<'_, ParquetResult<bytes::Bytes>> {
Box::pin(async move {
self.maybe_initialize()
@@ -157,13 +158,16 @@ impl AsyncFileReader for LazyParquetFileReader {
})
}
fn get_metadata(&mut self) -> BoxFuture<'_, ParquetResult<Arc<ParquetMetaData>>> {
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
self.maybe_initialize()
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
// Safety: Must initialized
self.reader.as_mut().unwrap().get_metadata().await
self.reader.as_mut().unwrap().get_metadata(options).await
})
}
}

View File

@@ -19,35 +19,39 @@ use std::vec;
use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::physical_plan::{
CsvConfig, CsvOpener, FileOpener, FileScanConfig, FileStream, JsonOpener, ParquetExec,
CsvSource, FileScanConfig, FileSource, FileStream, JsonSource, ParquetSource,
};
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_orc::OrcSource;
use futures::StreamExt;
use object_store::ObjectStore;
use super::FORMAT_TYPE;
use crate::file_format::orc::{OrcFormat, OrcOpener};
use crate::file_format::parquet::DefaultParquetFileReaderFactory;
use crate::file_format::{FileFormat, Format};
use crate::file_format::{FileFormat, Format, OrcFormat};
use crate::test_util::{scan_config, test_basic_schema, test_store};
use crate::{error, test_util};
struct Test<'a, T: FileOpener> {
struct Test<'a> {
config: FileScanConfig,
opener: T,
file_source: Arc<dyn FileSource>,
expected: Vec<&'a str>,
}
impl<T: FileOpener> Test<'_, T> {
pub async fn run(self) {
impl Test<'_> {
async fn run(self, store: &ObjectStore) {
let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
let file_opener = self.file_source.create_file_opener(store, &self.config, 0);
let result = FileStream::new(
&self.config,
0,
self.opener,
file_opener,
&ExecutionPlanMetricsSet::new(),
)
.unwrap()
@@ -62,26 +66,16 @@ impl<T: FileOpener> Test<'_, T> {
#[tokio::test]
async fn test_json_opener() {
let store = test_store("/");
let store = Arc::new(object_store_opendal::OpendalStore::new(store));
let schema = test_basic_schema();
let json_opener = || {
JsonOpener::new(
test_util::TEST_BATCH_SIZE,
schema.clone(),
FileCompressionType::UNCOMPRESSED,
store.clone(),
)
};
let file_source = Arc::new(JsonSource::new()).with_batch_size(test_util::TEST_BATCH_SIZE);
let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json")
.display()
.to_string();
let tests = [
Test {
config: scan_config(schema.clone(), None, path),
opener: json_opener(),
config: scan_config(schema.clone(), None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+",
"| num | str |",
@@ -93,8 +87,8 @@ async fn test_json_opener() {
],
},
Test {
config: scan_config(schema.clone(), Some(1), path),
opener: json_opener(),
config: scan_config(schema, Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+",
"| num | str |",
@@ -106,37 +100,26 @@ async fn test_json_opener() {
];
for test in tests {
test.run().await;
test.run(&store).await;
}
}
#[tokio::test]
async fn test_csv_opener() {
let store = test_store("/");
let store = Arc::new(object_store_opendal::OpendalStore::new(store));
let schema = test_basic_schema();
let path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv")
.display()
.to_string();
let csv_config = Arc::new(CsvConfig::new(
test_util::TEST_BATCH_SIZE,
schema.clone(),
None,
true,
b',',
b'"',
None,
store,
None,
));
let csv_opener = || CsvOpener::new(csv_config.clone(), FileCompressionType::UNCOMPRESSED);
let file_source = CsvSource::new(true, b',', b'"')
.with_batch_size(test_util::TEST_BATCH_SIZE)
.with_schema(schema.clone());
let tests = [
Test {
config: scan_config(schema.clone(), None, path),
opener: csv_opener(),
config: scan_config(schema.clone(), None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+",
"| num | str |",
@@ -148,8 +131,8 @@ async fn test_csv_opener() {
],
},
Test {
config: scan_config(schema.clone(), Some(1), path),
opener: csv_opener(),
config: scan_config(schema, Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+",
"| num | str |",
@@ -161,7 +144,7 @@ async fn test_csv_opener() {
];
for test in tests {
test.run().await;
test.run(&store).await;
}
}
@@ -174,12 +157,12 @@ async fn test_parquet_exec() {
let path = &find_workspace_path("/src/common/datasource/tests/parquet/basic.parquet")
.display()
.to_string();
let base_config = scan_config(schema.clone(), None, path);
let exec = ParquetExec::builder(base_config)
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)))
.build();
let parquet_source = ParquetSource::default()
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));
let config = scan_config(schema, None, path, Arc::new(parquet_source));
let exec = DataSourceExec::from_data_source(config);
let ctx = SessionContext::new();
let context = Arc::new(TaskContext::from(&ctx));
@@ -208,20 +191,18 @@ async fn test_parquet_exec() {
#[tokio::test]
async fn test_orc_opener() {
let root = find_workspace_path("/src/common/datasource/tests/orc")
let path = &find_workspace_path("/src/common/datasource/tests/orc/test.orc")
.display()
.to_string();
let store = test_store(&root);
let schema = OrcFormat.infer_schema(&store, "test.orc").await.unwrap();
let schema = Arc::new(schema);
let orc_opener = OrcOpener::new(store.clone(), schema.clone(), None);
let path = "test.orc";
let store = test_store("/");
let schema = Arc::new(OrcFormat.infer_schema(&store, path).await.unwrap());
let file_source = Arc::new(OrcSource::default());
let tests = [
Test {
config: scan_config(schema.clone(), None, path),
opener: orc_opener.clone(),
config: scan_config(schema.clone(), None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |",
@@ -235,8 +216,8 @@ async fn test_orc_opener() {
],
},
Test {
config: scan_config(schema.clone(), Some(1), path),
opener: orc_opener.clone(),
config: scan_config(schema.clone(), Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+",
"| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |",
@@ -248,7 +229,7 @@ async fn test_orc_opener() {
];
for test in tests {
test.run().await;
test.run(&store).await;
}
}

View File

@@ -16,12 +16,12 @@ use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datafusion::common::{Constraints, Statistics};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
CsvConfig, CsvOpener, FileScanConfig, FileStream, JsonOpener,
CsvSource, FileGroup, FileScanConfig, FileScanConfigBuilder, FileSource, FileStream,
JsonOpener, JsonSource,
};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use object_store::services::Fs;
@@ -68,21 +68,20 @@ pub fn test_basic_schema() -> SchemaRef {
Arc::new(schema)
}
pub fn scan_config(file_schema: SchemaRef, limit: Option<usize>, filename: &str) -> FileScanConfig {
pub(crate) fn scan_config(
file_schema: SchemaRef,
limit: Option<usize>,
filename: &str,
file_source: Arc<dyn FileSource>,
) -> FileScanConfig {
// object_store only recognize the Unix style path, so make it happy.
let filename = &filename.replace('\\', "/");
let statistics = Statistics::new_unknown(file_schema.as_ref());
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema,
file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
constraints: Constraints::empty(),
statistics,
projection: None,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
}
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.to_string(), 4096)]);
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
.with_file_group(file_group)
.with_limit(limit)
.build()
}
pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
@@ -99,9 +98,14 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let size = store.read(origin_path).await.unwrap().len();
let config = scan_config(schema.clone(), None, origin_path);
let stream = FileStream::new(&config, 0, json_opener, &ExecutionPlanMetricsSet::new()).unwrap();
let config = scan_config(schema, None, origin_path, Arc::new(JsonSource::new()));
let stream = FileStream::new(
&config,
0,
Arc::new(json_opener),
&ExecutionPlanMetricsSet::new(),
)
.unwrap();
let (tmp_store, dir) = test_tmp_store("test_stream_to_json");
@@ -127,24 +131,17 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
let schema = test_basic_schema();
let csv_config = Arc::new(CsvConfig::new(
TEST_BATCH_SIZE,
schema.clone(),
None,
true,
b',',
b'"',
None,
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
None,
));
let csv_opener = CsvOpener::new(csv_config, FileCompressionType::UNCOMPRESSED);
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(schema, None, origin_path, csv_source.clone());
let size = store.read(origin_path).await.unwrap().len();
let config = scan_config(schema.clone(), None, origin_path);
let csv_opener = csv_source.create_file_opener(
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
&config,
0,
);
let stream = FileStream::new(&config, 0, csv_opener, &ExecutionPlanMetricsSet::new()).unwrap();
let (tmp_store, dir) = test_tmp_store("test_stream_to_csv");

View File

@@ -25,14 +25,14 @@
use std::sync::Arc;
use arrow::array::StructArray;
use arrow_schema::Fields;
use arrow_schema::{FieldRef, Fields};
use common_telemetry::debug;
use datafusion::functions_aggregate::all_default_aggregate_functions;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::optimizer::AnalyzerRule;
use datafusion::physical_planner::create_aggregate_expr_and_maybe_filter;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::AggregateFunction;
use datafusion_expr::expr::{AggregateFunction, AggregateFunctionParams};
use datafusion_expr::function::StateFieldsArgs;
use datafusion_expr::{
Accumulator, Aggregate, AggregateUDF, AggregateUDFImpl, Expr, ExprSchemable, LogicalPlan,
@@ -146,6 +146,7 @@ impl StateMergeHelper {
};
let original_input_types = aggr_func
.params
.args
.iter()
.map(|e| e.get_type(&aggr.input.schema()))
@@ -156,11 +157,7 @@ impl StateMergeHelper {
let expr = AggregateFunction {
func: Arc::new(state_func.into()),
args: aggr_func.args.clone(),
distinct: aggr_func.distinct,
filter: aggr_func.filter.clone(),
order_by: aggr_func.order_by.clone(),
null_treatment: aggr_func.null_treatment,
params: aggr_func.params.clone(),
};
let expr = Expr::AggregateFunction(expr);
let lower_state_output_col_name = expr.schema_name().to_string();
@@ -182,11 +179,10 @@ impl StateMergeHelper {
let arg = Expr::Column(Column::new_unqualified(lower_state_output_col_name));
let expr = AggregateFunction {
func: Arc::new(merge_func.into()),
args: vec![arg],
distinct: aggr_func.distinct,
filter: aggr_func.filter.clone(),
order_by: aggr_func.order_by.clone(),
null_treatment: aggr_func.null_treatment,
params: AggregateFunctionParams {
args: vec![arg],
..aggr_func.params.clone()
},
};
// alias to the original aggregate expr's schema name, so parent plan can refer to it
@@ -247,15 +243,8 @@ impl StateWrapper {
pub fn deduce_aggr_return_type(
&self,
acc_args: &datafusion_expr::function::AccumulatorArgs,
) -> datafusion_common::Result<DataType> {
let input_exprs = acc_args.exprs;
let input_schema = acc_args.schema;
let input_types = input_exprs
.iter()
.map(|e| e.data_type(input_schema))
.collect::<Result<Vec<_>, _>>()?;
let return_type = self.inner.return_type(&input_types)?;
Ok(return_type)
) -> datafusion_common::Result<FieldRef> {
self.inner.return_field(acc_args.schema.fields())
}
}
@@ -265,14 +254,13 @@ impl AggregateUDFImpl for StateWrapper {
acc_args: datafusion_expr::function::AccumulatorArgs<'b>,
) -> datafusion_common::Result<Box<dyn Accumulator>> {
// fix and recover proper acc args for the original aggregate function.
let state_type = acc_args.return_type.clone();
let state_type = acc_args.return_type().clone();
let inner = {
let old_return_type = self.deduce_aggr_return_type(&acc_args)?;
let acc_args = datafusion_expr::function::AccumulatorArgs {
return_type: &old_return_type,
return_field: self.deduce_aggr_return_type(&acc_args)?,
schema: acc_args.schema,
ignore_nulls: acc_args.ignore_nulls,
ordering_req: acc_args.ordering_req,
order_bys: acc_args.order_bys,
is_reversed: acc_args.is_reversed,
name: acc_args.name,
is_distinct: acc_args.is_distinct,
@@ -297,11 +285,15 @@ impl AggregateUDFImpl for StateWrapper {
/// Return state_fields as the output struct type.
///
fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
let old_return_type = self.inner.return_type(arg_types)?;
let input_fields = &arg_types
.iter()
.map(|x| Arc::new(Field::new("x", x.clone(), false)))
.collect::<Vec<_>>();
let state_fields_args = StateFieldsArgs {
name: self.inner().name(),
input_types: arg_types,
return_type: &old_return_type,
input_fields,
return_field: self.inner.return_field(input_fields)?,
// TODO(discord9): how to get this?, probably ok?
ordering_fields: &[],
is_distinct: false,
@@ -315,12 +307,11 @@ impl AggregateUDFImpl for StateWrapper {
fn state_fields(
&self,
args: datafusion_expr::function::StateFieldsArgs,
) -> datafusion_common::Result<Vec<Field>> {
let old_return_type = self.inner.return_type(args.input_types)?;
) -> datafusion_common::Result<Vec<FieldRef>> {
let state_fields_args = StateFieldsArgs {
name: args.name,
input_types: args.input_types,
return_type: &old_return_type,
input_fields: args.input_fields,
return_field: self.inner.return_field(args.input_fields)?,
ordering_fields: args.ordering_fields,
is_distinct: args.is_distinct,
};
@@ -502,7 +493,7 @@ impl AggregateUDFImpl for MergeWrapper {
fn state_fields(
&self,
_args: datafusion_expr::function::StateFieldsArgs,
) -> datafusion_common::Result<Vec<Field>> {
) -> datafusion_common::Result<Vec<FieldRef>> {
self.original_phy_expr.state_fields()
}
}

View File

@@ -35,7 +35,7 @@ use datafusion::prelude::SessionContext;
use datafusion_common::{Column, TableReference};
use datafusion_expr::expr::AggregateFunction;
use datafusion_expr::sqlparser::ast::NullTreatment;
use datafusion_expr::{Aggregate, Expr, LogicalPlan, SortExpr, TableScan};
use datafusion_expr::{lit, Aggregate, Expr, LogicalPlan, SortExpr, TableScan};
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datatypes::arrow_array::StringArray;
@@ -234,7 +234,7 @@ async fn test_sum_udaf() {
vec![Expr::Column(Column::new_unqualified("number"))],
false,
None,
None,
vec![],
None,
))],
)
@@ -250,7 +250,7 @@ async fn test_sum_udaf() {
vec![Expr::Column(Column::new_unqualified("number"))],
false,
None,
None,
vec![],
None,
))],
)
@@ -290,7 +290,7 @@ async fn test_sum_udaf() {
vec![Expr::Column(Column::new_unqualified("__sum_state(number)"))],
false,
None,
None,
vec![],
None,
))
.alias("sum(number)")],
@@ -378,7 +378,7 @@ async fn test_avg_udaf() {
vec![Expr::Column(Column::new_unqualified("number"))],
false,
None,
None,
vec![],
None,
))],
)
@@ -395,7 +395,7 @@ async fn test_avg_udaf() {
vec![Expr::Column(Column::new_unqualified("number"))],
false,
None,
None,
vec![],
None,
))],
)
@@ -449,7 +449,7 @@ async fn test_avg_udaf() {
vec![Expr::Column(Column::new_unqualified("__avg_state(number)"))],
false,
None,
None,
vec![],
None,
))
.alias("avg(number)")],
@@ -551,7 +551,7 @@ async fn test_udaf_correct_eval_result() {
expected_fn: Option<ExpectedFn>,
distinct: bool,
filter: Option<Box<Expr>>,
order_by: Option<Vec<SortExpr>>,
order_by: Vec<SortExpr>,
null_treatment: Option<NullTreatment>,
}
type ExpectedFn = fn(ArrayRef) -> bool;
@@ -575,7 +575,7 @@ async fn test_udaf_correct_eval_result() {
expected_fn: None,
distinct: false,
filter: None,
order_by: None,
order_by: vec![],
null_treatment: None,
},
TestCase {
@@ -596,7 +596,7 @@ async fn test_udaf_correct_eval_result() {
expected_fn: None,
distinct: false,
filter: None,
order_by: None,
order_by: vec![],
null_treatment: None,
},
TestCase {
@@ -619,7 +619,7 @@ async fn test_udaf_correct_eval_result() {
expected_fn: None,
distinct: false,
filter: None,
order_by: None,
order_by: vec![],
null_treatment: None,
},
TestCase {
@@ -630,8 +630,8 @@ async fn test_udaf_correct_eval_result() {
true,
)])),
args: vec![
Expr::Literal(ScalarValue::Int64(Some(128))),
Expr::Literal(ScalarValue::Float64(Some(0.05))),
lit(128i64),
lit(0.05f64),
Expr::Column(Column::new_unqualified("number")),
],
input: vec![Arc::new(Float64Array::from(vec![
@@ -659,7 +659,7 @@ async fn test_udaf_correct_eval_result() {
}),
distinct: false,
filter: None,
order_by: None,
order_by: vec![],
null_treatment: None,
},
TestCase {
@@ -690,7 +690,7 @@ async fn test_udaf_correct_eval_result() {
}),
distinct: false,
filter: None,
order_by: None,
order_by: vec![],
null_treatment: None,
},
// TODO(discord9): udd_merge/hll_merge/geo_path/quantile_aggr tests

View File

@@ -41,7 +41,7 @@ use datatypes::arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, Int64Array, ListArray, UInt64Array,
};
use datatypes::arrow::buffer::{OffsetBuffer, ScalarBuffer};
use datatypes::arrow::datatypes::{DataType, Field};
use datatypes::arrow::datatypes::{DataType, Field, FieldRef};
use crate::function_registry::FunctionRegistry;
@@ -94,14 +94,14 @@ impl AggregateUDFImpl for CountHash {
false
}
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
Ok(vec![Field::new_list(
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
Ok(vec![Arc::new(Field::new_list(
format_state_name(args.name, "count_hash"),
Field::new_list_field(DataType::UInt64, true),
// For count_hash accumulator, null list item stands for an
// empty value set (i.e., all NULL value so far for that group).
true,
)])
))])
}
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {

View File

@@ -23,6 +23,7 @@ use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
use sql::ast::ObjectNamePartExt;
use sql::parser::ParserContext;
use store_api::storage::ConcreteDataType;
@@ -85,9 +86,9 @@ fn parse_flush_flow(
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
flow_name.to_string_unquoted(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
[catalog, flow_name] => (catalog.to_string_unquoted(), flow_name.to_string_unquoted()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(

View File

@@ -113,6 +113,8 @@ mod tests {
use common_query::prelude::ScalarValue;
use datafusion::arrow::array::BooleanArray;
use datafusion_common::config::ConfigOptions;
use datatypes::arrow::datatypes::Field;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::vectors::{BooleanVector, ConstantVector};
@@ -162,10 +164,21 @@ mod tests {
]))),
];
let arg_fields = vec![
Arc::new(Field::new("a", args[0].data_type(), false)),
Arc::new(Field::new("b", args[1].data_type(), false)),
];
let return_field = Arc::new(Field::new(
"x",
ConcreteDataType::boolean_datatype().as_arrow_type(),
false,
));
let args = ScalarFunctionArgs {
args,
arg_fields,
number_rows: 4,
return_type: &ConcreteDataType::boolean_datatype().as_arrow_type(),
return_field,
config_options: Arc::new(ConfigOptions::default()),
};
match udf.invoke_with_args(args).unwrap() {
datafusion_expr::ColumnarValue::Array(x) => {

View File

@@ -20,7 +20,7 @@ common-telemetry.workspace = true
common-time.workspace = true
dashmap.workspace = true
datatypes.workspace = true
flatbuffers = "24"
flatbuffers = "25.2"
hyper.workspace = true
lazy_static.workspace = true
prost.workspace = true

View File

@@ -25,7 +25,7 @@ use common_recordbatch::DfRecordBatch;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::buffer::Buffer;
use datatypes::arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
use datatypes::arrow::datatypes::{DataType, Schema as ArrowSchema, SchemaRef};
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::ipc::{convert, reader, root_as_message, writer, MessageHeader};
use flatbuffers::FlatBufferBuilder;
@@ -91,7 +91,15 @@ impl FlightEncoder {
/// be encoded to exactly one [FlightData].
pub fn encode(&mut self, flight_message: FlightMessage) -> Vec1<FlightData> {
match flight_message {
FlightMessage::Schema(schema) => vec1![self.encode_schema(schema.as_ref())],
FlightMessage::Schema(schema) => {
schema.fields().iter().for_each(|x| {
if matches!(x.data_type(), DataType::Dictionary(_, _)) {
self.dictionary_tracker.next_dict_id();
}
});
vec1![self.encode_schema(schema.as_ref())]
}
FlightMessage::RecordBatch(record_batch) => {
let (encoded_dictionaries, encoded_batch) = self
.data_gen

View File

@@ -24,7 +24,7 @@ use datafusion::error::Result as DatafusionResult;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
use datafusion_common::{Column, TableReference};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{col, DmlStatement, WriteOp};
use datafusion_expr::{col, DmlStatement, TableSource, WriteOp};
pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter};
use snafu::ResultExt;
@@ -131,7 +131,7 @@ pub fn breakup_insert_plan(
/// create a `insert into table_name <input>` logical plan
pub fn add_insert_to_logical_plan(
table_name: TableName,
table_schema: datafusion_common::DFSchemaRef,
target: Arc<dyn TableSource>,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let table_name = TableReference::Full {
@@ -142,7 +142,7 @@ pub fn add_insert_to_logical_plan(
let plan = LogicalPlan::Dml(DmlStatement::new(
table_name,
table_schema,
target,
WriteOp::Insert(InsertOp::Append),
Arc::new(input),
));

View File

@@ -87,29 +87,31 @@ fn timestamp_to_literal(timestamp: &Timestamp) -> Expr {
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
};
Expr::Literal(scalar_value)
Expr::Literal(scalar_value, None)
}
#[cfg(test)]
mod tests {
use datafusion_expr::Literal;
use super::*;
#[test]
fn test_timestamp_to_literal() {
let timestamp = Timestamp::new(123456789, TimeUnit::Second);
let expected = Expr::Literal(ScalarValue::TimestampSecond(Some(123456789), None));
let expected = ScalarValue::TimestampSecond(Some(123456789), None).lit();
assert_eq!(timestamp_to_literal(&timestamp), expected);
let timestamp = Timestamp::new(123456789, TimeUnit::Millisecond);
let expected = Expr::Literal(ScalarValue::TimestampMillisecond(Some(123456789), None));
let expected = ScalarValue::TimestampMillisecond(Some(123456789), None).lit();
assert_eq!(timestamp_to_literal(&timestamp), expected);
let timestamp = Timestamp::new(123456789, TimeUnit::Microsecond);
let expected = Expr::Literal(ScalarValue::TimestampMicrosecond(Some(123456789), None));
let expected = ScalarValue::TimestampMicrosecond(Some(123456789), None).lit();
assert_eq!(timestamp_to_literal(&timestamp), expected);
let timestamp = Timestamp::new(123456789, TimeUnit::Nanosecond);
let expected = Expr::Literal(ScalarValue::TimestampNanosecond(Some(123456789), None));
let expected = ScalarValue::TimestampNanosecond(Some(123456789), None).lit();
assert_eq!(timestamp_to_literal(&timestamp), expected);
}
}

View File

@@ -26,7 +26,7 @@ use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
use datafusion_expr::{
Accumulator, AccumulatorFactoryFunction, AggregateUDF as DfAggregateUdf, AggregateUDFImpl,
};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, FieldRef};
use datatypes::data_type::DataType;
use crate::function::{
@@ -129,14 +129,14 @@ impl AggregateUDFImpl for DfUdafAdapter {
(self.accumulator)(acc_args)
}
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
let state_types = self.creator.state_types()?;
let fields = state_types
.into_iter()
.enumerate()
.map(|(i, t)| {
let name = format!("{}_{i}", args.name);
Field::new(name, t.as_arrow_type(), true)
Arc::new(Field::new(name, t.as_arrow_type(), true))
})
.collect::<Vec<_>>();
Ok(fields)

View File

@@ -134,11 +134,11 @@ where
b.and_then(|b| {
let projected_column = b.project(&projection)?;
if projected_column.schema().fields.len() != projected_schema.fields.len() {
return Err(DataFusionError::ArrowError(ArrowError::SchemaError(format!(
return Err(DataFusionError::ArrowError(Box::new(ArrowError::SchemaError(format!(
"Trying to cast a RecordBatch into an incompatible schema. RecordBatch: {}, Target: {}",
projected_column.schema(),
projected_schema,
)), None));
))), None));
}
let mut columns = Vec::with_capacity(projected_schema.fields.len());
@@ -360,7 +360,7 @@ impl ExecutionPlanVisitor for MetricCollector {
// skip if no metric available
let Some(metric) = plan.metrics() else {
self.record_batch_metrics.plan_metrics.push(PlanMetrics {
plan: std::any::type_name::<Self>().to_string(),
plan: plan.name().to_string(),
level: self.current_level,
metrics: vec![],
});

View File

@@ -77,7 +77,7 @@ impl SimpleFilterEvaluator {
_ => return None,
}
let Expr::Literal(val) = lit.lit() else {
let Expr::Literal(val, _) = lit.lit() else {
return None;
};
@@ -143,8 +143,8 @@ impl SimpleFilterEvaluator {
// swap the expr if it is in the form of `literal` `op` `col`
let mut op = binary.op;
let (lhs, rhs) = match (&*binary.left, &*binary.right) {
(Expr::Column(ref col), Expr::Literal(ref lit)) => (col, lit),
(Expr::Literal(ref lit), Expr::Column(ref col)) => {
(Expr::Column(col), Expr::Literal(lit, _)) => (col, lit),
(Expr::Literal(lit, _), Expr::Column(col)) => {
// safety: The previous check ensures the operator is able to swap.
op = op.swap().unwrap();
(col, lit)
@@ -359,15 +359,15 @@ mod test {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("foo"))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
right: Box::new(1.lit()),
});
assert!(SimpleFilterEvaluator::try_new(&expr).is_none());
// two literal is not supported
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
left: Box::new(1.lit()),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
right: Box::new(1.lit()),
});
assert!(SimpleFilterEvaluator::try_new(&expr).is_none());
@@ -384,10 +384,10 @@ mod test {
left: Box::new(Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("foo"))),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
right: Box::new(1.lit()),
})),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
right: Box::new(1.lit()),
});
assert!(SimpleFilterEvaluator::try_new(&expr).is_none());
}
@@ -398,13 +398,13 @@ mod test {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("foo"))),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
right: Box::new(1.lit()),
});
let _ = SimpleFilterEvaluator::try_new(&expr).unwrap();
// swap operands
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
left: Box::new(1.lit()),
op: Operator::Lt,
right: Box::new(Expr::Column(Column::from_name("foo"))),
});
@@ -418,7 +418,7 @@ mod test {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("foo"))),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
right: Box::new(1i64.lit()),
});
let evaluator = SimpleFilterEvaluator::try_new(&expr).unwrap();
@@ -440,7 +440,7 @@ mod test {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("foo"))),
op: Operator::Lt,
right: Box::new(Expr::Literal(ScalarValue::Int64(Some(1)))),
right: Box::new(1i64.lit()),
});
let evaluator = SimpleFilterEvaluator::try_new(&expr).unwrap();

View File

@@ -16,6 +16,7 @@ use common_time::timezone::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::constraint::{CURRENT_TIMESTAMP, CURRENT_TIMESTAMP_FN};
use datatypes::schema::ColumnDefaultConstraint;
use sqlparser::ast::ValueWithSpan;
pub use sqlparser::ast::{
visit_expressions_mut, visit_statements_mut, BinaryOperator, ColumnDef, ColumnOption,
ColumnOptionDef, DataType, Expr, Function, FunctionArg, FunctionArgExpr, FunctionArguments,
@@ -38,7 +39,7 @@ pub fn parse_column_default_constraint(
{
let default_constraint = match &opt.option {
ColumnOption::Default(Expr::Value(v)) => ColumnDefaultConstraint::Value(
sql_value_to_value(column_name, data_type, v, timezone, None, false)?,
sql_value_to_value(column_name, data_type, &v.value, timezone, None, false)?,
),
ColumnOption::Default(Expr::Function(func)) => {
let mut func = format!("{func}").to_lowercase();
@@ -54,8 +55,13 @@ pub fn parse_column_default_constraint(
// Specialized process for handling numerical inputs to prevent
// overflow errors during the parsing of negative numbers,
// See https://github.com/GreptimeTeam/greptimedb/issues/4351
if let (UnaryOperator::Minus, Expr::Value(SqlValue::Number(n, _))) =
(op, expr.as_ref())
if let (
UnaryOperator::Minus,
Expr::Value(ValueWithSpan {
value: SqlValue::Number(n, _),
span: _,
}),
) = (op, expr.as_ref())
{
return Ok(Some(ColumnDefaultConstraint::Value(sql_number_to_value(
data_type,
@@ -64,8 +70,14 @@ pub fn parse_column_default_constraint(
}
if let Expr::Value(v) = &**expr {
let value =
sql_value_to_value(column_name, data_type, v, timezone, Some(*op), false)?;
let value = sql_value_to_value(
column_name,
data_type,
&v.value,
timezone,
Some(*op),
false,
)?;
ColumnDefaultConstraint::Value(value)
} else {
return UnsupportedDefaultValueSnafu {
@@ -85,7 +97,7 @@ pub fn parse_column_default_constraint(
_ => {
return UnsupportedDefaultValueSnafu {
column_name,
expr: Expr::Value(SqlValue::Null),
expr: Expr::Value(SqlValue::Null.into()),
}
.fail();
}
@@ -113,7 +125,7 @@ mod test {
let opts = vec![
ColumnOptionDef {
name: None,
option: ColumnOption::Default(Expr::Value(bool_value)),
option: ColumnOption::Default(Expr::Value(bool_value.into())),
},
ColumnOptionDef {
name: None,
@@ -139,7 +151,9 @@ mod test {
name: None,
option: ColumnOption::Default(Expr::UnaryOp {
op: UnaryOperator::Minus,
expr: Box::new(Expr::Value(SqlValue::Number("32768".to_string(), false))),
expr: Box::new(Expr::Value(
SqlValue::Number("32768".to_string(), false).into(),
)),
}),
}];
@@ -161,10 +175,9 @@ mod test {
fn test_incorrect_default_value_issue_3479() {
let opts = vec![ColumnOptionDef {
name: None,
option: ColumnOption::Default(Expr::Value(SqlValue::Number(
"0.047318541668048164".into(),
false,
))),
option: ColumnOption::Default(Expr::Value(
SqlValue::Number("0.047318541668048164".into(), false).into(),
)),
}];
let constraint = parse_column_default_constraint(
"coll",