chore: upgrade DataFusion family, again (#7578)

* chore: upgrade DataFusion family

Signed-off-by: luofucong <luofc@foxmail.com>

* chore: switch to released version of datafusion-pg-catalog

---------

Signed-off-by: luofucong <luofc@foxmail.com>
Co-authored-by: Ning Sun <sunning@greptime.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
This commit is contained in:
LFC
2026-03-03 15:36:39 +08:00
committed by GitHub
parent aab839b6e4
commit b2074e3863
135 changed files with 1589 additions and 2555 deletions

View File

@@ -10,7 +10,7 @@ workspace = true
[dependencies]
arrow.workspace = true
arrow-schema.workspace = true
async-compression = { version = "0.3", features = [
async-compression = { version = "0.4", features = [
"bzip2",
"gzip",
"xz",

View File

@@ -203,8 +203,8 @@ pub enum Error {
error: parquet::errors::ParquetError,
},
#[snafu(display("Failed to build file stream"))]
BuildFileStream {
#[snafu(transparent)]
DataFusion {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
@@ -241,8 +241,9 @@ impl ErrorExt for Error {
| ReadRecordBatch { .. }
| WriteRecordBatch { .. }
| EncodeRecordBatch { .. }
| BuildFileStream { .. }
| OrcReader { .. } => StatusCode::Unexpected,
DataFusion { .. } => StatusCode::Internal,
}
}

View File

@@ -313,16 +313,13 @@ pub async fn file_to_stream(
filename.to_string(),
0,
)]))
.with_projection_indices(projection)
.with_projection_indices(projection)?
.with_file_compression_type(df_compression)
.build();
let store = Arc::new(OpendalStore::new(store.clone()));
let file_opener = file_source
.with_projection(&config)
.create_file_opener(store, &config, 0);
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
.context(error::BuildFileStreamSnafu)?;
let file_opener = config.file_source().create_file_opener(store, &config, 0)?;
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?;
Ok(Box::pin(stream))
}

View File

@@ -46,7 +46,10 @@ struct Test<'a> {
impl Test<'_> {
async fn run(self, store: &ObjectStore) {
let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
let file_opener = self.file_source.create_file_opener(store, &self.config, 0);
let file_opener = self
.file_source
.create_file_opener(store, &self.config, 0)
.unwrap();
let result = FileStream::new(
&self.config,

View File

@@ -104,6 +104,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
schema.clone(),
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
true,
);
let size = store.read(origin_path).await.unwrap().len();
@@ -154,11 +155,13 @@ pub async fn setup_stream_to_csv_test(
let config = scan_config(None, origin_path, csv_source.clone());
let size = store.read(origin_path).await.unwrap().len();
let csv_opener = csv_source.create_file_opener(
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
&config,
0,
);
let csv_opener = csv_source
.create_file_opener(
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
&config,
0,
)
.unwrap();
let stream = FileStream::new(&config, 0, csv_opener, &ExecutionPlanMetricsSet::new()).unwrap();
let (tmp_store, dir) = test_tmp_store("test_stream_to_csv");

View File

@@ -61,18 +61,18 @@ use crate::scalars::uddsketch_calc::UddSketchCalcFunction;
pub struct MockInputExec {
input: Vec<RecordBatch>,
schema: SchemaRef,
properties: PlanProperties,
properties: Arc<PlanProperties>,
}
impl MockInputExec {
pub fn new(input: Vec<RecordBatch>, schema: SchemaRef) -> Self {
Self {
properties: PlanProperties::new(
properties: Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
),
)),
input,
schema,
}
@@ -94,7 +94,7 @@ impl ExecutionPlan for MockInputExec {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

View File

@@ -428,7 +428,7 @@ impl Accumulator for CountHashAccumulator {
&self.random_state,
&mut self.batch_hashes,
)?;
for hash in hashes.as_slice() {
for hash in hashes {
self.values.insert(*hash);
}
Ok(())

View File

@@ -48,7 +48,7 @@ pub fn rename_logical_plan_columns(
plan.schema().qualified_field_from_column(&old_column)?;
for (qualifier, field) in plan.schema().iter() {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
if qualifier.eq(&qualifier_rename) && field == field_rename {
projection.push(col(Column::from((qualifier, field))).alias(new_name));
}
}

View File

@@ -32,7 +32,7 @@ pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
properties: PlanProperties,
properties: Arc<PlanProperties>,
output_ordering: Option<Vec<PhysicalSortExpr>>,
}
@@ -49,12 +49,12 @@ impl StreamScanAdapter {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema();
let arrow_schema = schema.arrow_schema().clone();
let properties = PlanProperties::new(
let properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(arrow_schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
));
Self {
stream: Mutex::new(Some(stream)),
@@ -91,7 +91,7 @@ impl ExecutionPlan for StreamScanAdapter {
self.arrow_schema.clone()
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}