chore: upgrade DataFusion family (#7558)

* chore: upgrade DataFusion family

Signed-off-by: luofucong <luofc@foxmail.com>

* use main proto

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2026-01-14 22:02:31 +08:00
committed by GitHub
parent a5cb0116a2
commit e64c31e59a
96 changed files with 2003 additions and 1531 deletions

View File

@@ -18,6 +18,7 @@ use common_datasource::file_format::Format;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
use datafusion::common::ToDFSchema;
use datafusion::config::CsvOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
@@ -34,7 +35,6 @@ use datafusion::prelude::SessionContext;
use datafusion_expr::expr::Expr;
use datafusion_expr::utils::conjunction;
use datafusion_orc::OrcSource;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;
@@ -45,7 +45,6 @@ const DEFAULT_BATCH_SIZE: usize = 8192;
fn build_record_batch_stream(
scan_plan_config: &ScanPlanConfig,
file_schema: Arc<ArrowSchema>,
limit: Option<usize>,
file_source: Arc<dyn FileSource>,
) -> Result<DfSendableRecordBatchStream> {
@@ -55,15 +54,12 @@ fn build_record_batch_stream(
.map(|filename| PartitionedFile::new(filename.clone(), 0))
.collect::<Vec<_>>();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_projection(scan_plan_config.projection.cloned())
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_projection_indices(scan_plan_config.projection.cloned())
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
let store = Arc::new(object_store_opendal::OpendalStore::new(
scan_plan_config.store.clone(),
@@ -89,11 +85,14 @@ fn new_csv_stream(
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
.with_schema(file_schema.clone())
let options = CsvOptions::default()
.with_has_header(format.has_header)
.with_delimiter(format.delimiter);
let csv_source = CsvSource::new(file_schema)
.with_csv_options(options)
.with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, csv_source)
build_record_batch_stream(config, limit, csv_source)
}
fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
@@ -102,8 +101,8 @@ fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStrea
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let file_source = JsonSource::new().with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, file_source)
let file_source = JsonSource::new(file_schema).with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, file_source)
}
fn new_parquet_stream_with_exec_plan(
@@ -126,9 +125,10 @@ fn new_parquet_stream_with_exec_plan(
.collect::<Vec<_>>(),
);
let mut parquet_source = ParquetSource::default().with_parquet_file_reader_factory(Arc::new(
DefaultParquetFileReaderFactory::new(store.clone()),
));
let mut parquet_source = ParquetSource::new(file_schema.clone())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
store.clone(),
)));
// build predicate filter
let filters = filters.to_vec();
@@ -143,15 +143,12 @@ fn new_parquet_stream_with_exec_plan(
parquet_source = parquet_source.with_predicate(filters);
};
let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
Arc::new(parquet_source),
)
.with_file_group(file_group)
.with_projection(projection.cloned())
.with_limit(*limit)
.build();
let file_scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(parquet_source))
.with_file_group(file_group)
.with_projection_indices(projection.cloned())
.with_limit(*limit)
.build();
// TODO(ruihang): get this from upper layer
let task_ctx = SessionContext::default().task_ctx();
@@ -170,8 +167,8 @@ fn new_orc_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let file_source = OrcSource::default().with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, file_source)
let file_source = OrcSource::new(file_schema.into()).with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, file_source)
}
#[derive(Debug, Clone)]