feat: bump datafusion to 53 (#8107)

* feat: bump datafusion to 53

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use updated datafusion-orc

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* maintain topk in part sort

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* docs: clarify part_sort range detection and Null threshold semantics

* fix limit 0

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: discord9 <discord9@163.com>
This commit is contained in:
Ruihang Xia
2026-05-14 16:33:31 +08:00
committed by GitHub
parent 8a8469e1f6
commit 3b3f5d628d
19 changed files with 922 additions and 824 deletions

View File

@@ -28,11 +28,12 @@ common-runtime.workspace = true
common-telemetry.workspace = true
datafusion.workspace = true
datafusion-datasource.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
orc-rust = { version = "0.7", default-features = false, features = ["async"] }
orc-rust = { version = "0.8", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true
regex.workspace = true
@@ -45,4 +46,3 @@ url.workspace = true
[dev-dependencies]
common-test-util.workspace = true
datafusion-orc.workspace = true

View File

@@ -15,6 +15,7 @@
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
pub use datafusion_orc::OrcSource;
use futures::FutureExt;
use futures::future::BoxFuture;
use object_store::ObjectStore;

View File

@@ -176,11 +176,14 @@ impl AsyncFileReader for LazyParquetFileReader {
.map_err(|e| ParquetError::External(Box::new(e)))?;
let metadata_opts = options.map(|o| o.metadata_options().clone());
let column_index_policy =
options.map_or(PageIndexPolicy::Skip, |o| o.column_index_policy());
let offset_index_policy =
options.map_or(PageIndexPolicy::Skip, |o| o.offset_index_policy());
let metadata_reader = ParquetMetaDataReader::new()
.with_metadata_options(metadata_opts)
.with_page_index_policy(PageIndexPolicy::from(
options.is_some_and(|o| o.page_index()),
))
.with_column_index_policy(column_index_policy)
.with_offset_index_policy(offset_index_policy)
.with_prefetch_hint(self.metadata_size_hint);
let metadata = metadata_reader

View File

@@ -26,11 +26,11 @@ use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::prelude::SessionContext;
use datafusion_orc::OrcSource;
use futures::StreamExt;
use object_store::ObjectStore;
use super::FORMAT_TYPE;
use crate::file_format::orc::OrcSource;
use crate::file_format::parquet::DefaultParquetFileReaderFactory;
use crate::file_format::{FileFormat, Format, OrcFormat};
use crate::test_util::{basic_schema_with_time_format, scan_config, test_basic_schema, test_store};

View File

@@ -26,7 +26,6 @@ common-test-util = { workspace = true, optional = true }
common-time.workspace = true
datafusion.workspace = true
datafusion-expr.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
object-store.workspace = true

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use common_datasource::file_format::Format;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::orc::OrcSource;
use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
use datafusion::common::ToDFSchema;
use datafusion::config::CsvOptions;
@@ -34,7 +35,6 @@ use datafusion::physical_plan::{
use datafusion::prelude::SessionContext;
use datafusion_expr::expr::Expr;
use datafusion_expr::utils::conjunction;
use datafusion_orc::OrcSource;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;

View File

@@ -127,7 +127,10 @@ pub(crate) fn assert_parquet_metadata_equal(x: Arc<ParquetMetaData>, y: Arc<Parq
.statistics()
.cloned()
.map(unset_min_max_backwards_compatible_flag);
let mut col_builder = col.into_builder().clear_statistics();
let mut col_builder = col
.into_builder()
.clear_statistics()
.clear_page_encoding_stats();
if let Some(stats) = stats {
col_builder = col_builder.set_statistics(stats);
}

View File

@@ -1202,7 +1202,7 @@ impl BulkPartEncoder {
WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_write_batch_size(row_group_size)
.set_max_row_group_size(row_group_size)
.set_max_row_group_row_count(Some(row_group_size))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_column_index_truncate_length(None)
.set_statistics_truncate_length(None)

View File

@@ -756,7 +756,7 @@ impl<'a> DataPartEncoder<'a> {
fn writer_props(self) -> WriterProperties {
let mut builder = WriterProperties::builder();
if let Some(row_group_size) = self.row_group_size {
builder = builder.set_max_row_group_size(row_group_size)
builder = builder.set_max_row_group_row_count(Some(row_group_size))
}
let ts_col = ColumnPath::new(vec![self.timestamp_column_name]);

View File

@@ -590,7 +590,7 @@ mod tests {
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(write_opts.row_group_size);
.set_max_row_group_row_count(Some(write_opts.row_group_size));
let writer_props = props_builder.build();

View File

@@ -425,7 +425,7 @@ where
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(opts.row_group_size)
.set_max_row_group_row_count(Some(opts.row_group_size))
.set_column_index_truncate_length(None)
.set_statistics_truncate_length(None);

View File

@@ -43,7 +43,6 @@ uuid.workspace = true
[dev-dependencies]
anyhow = "1.0"
arrow_object_store.workspace = true
common-telemetry.workspace = true
common-test-util.workspace = true
object_store_opendal.workspace = true

View File

@@ -23,13 +23,13 @@ use async_trait::async_trait;
use bytes::Bytes;
use datafusion_object_store::path::Path;
use datafusion_object_store::{
Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode, PutMultipartOptions,
PutOptions, PutPayload, PutResult, UploadPart,
Attribute, Attributes, CopyMode, CopyOptions, GetOptions, GetRange, GetResult,
GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore,
PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, UploadPart,
};
use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt, TryStreamExt};
use opendal::options::CopyOptions;
use opendal::options::CopyOptions as OpendalCopyOptions;
use opendal::raw::percent_decode_path;
use opendal::{Buffer, Operator, OperatorInfo, Writer};
use tokio::sync::{Mutex, oneshot};
@@ -107,7 +107,7 @@ impl OpendalStore {
to: &Path,
if_not_exists: bool,
) -> datafusion_object_store::Result<()> {
let mut copy_options = CopyOptions::default();
let mut copy_options = OpendalCopyOptions::default();
if if_not_exists {
copy_options.if_not_exists = true;
}
@@ -212,22 +212,6 @@ impl ArrowObjectStore for OpendalStore {
Ok(PutResult { e_tag, version })
}
async fn put_multipart(
&self,
location: &Path,
) -> datafusion_object_store::Result<Box<dyn MultipartUpload>> {
let decoded_location = percent_decode_path(location.as_ref());
let writer = self
.inner
.writer_with(&decoded_location)
.concurrent(8)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let upload = OpendalMultipartUpload::new(writer, location.clone());
Ok(Box::new(upload))
}
async fn put_multipart_opts(
&self,
location: &Path,
@@ -403,33 +387,49 @@ impl ArrowObjectStore for OpendalStore {
})
}
async fn get_range(
async fn get_ranges(
&self,
location: &Path,
range: Range<u64>,
) -> datafusion_object_store::Result<Bytes> {
ranges: &[Range<u64>],
) -> datafusion_object_store::Result<Vec<Bytes>> {
if ranges.is_empty() {
return Ok(Vec::new());
}
let raw_location = percent_decode_path(location.as_ref());
let reader = self
.inner
.reader_with(&raw_location)
.reader_with(raw_location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let buffers = reader
.fetch(ranges.to_vec())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
reader
.read(range.start..range.end)
.await
.map(|buf| buf.to_bytes())
.map_err(|err| format_object_store_error(err, location.as_ref()))
Ok(buffers.into_iter().map(|buf| buf.to_bytes()).collect())
}
async fn delete(&self, location: &Path) -> datafusion_object_store::Result<()> {
let decoded_location = percent_decode_path(location.as_ref());
self.inner
.delete(&decoded_location)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(())
fn delete_stream(
&self,
locations: BoxStream<'static, datafusion_object_store::Result<Path>>,
) -> BoxStream<'static, datafusion_object_store::Result<Path>> {
let this = self.clone();
locations
.map(move |location| {
let this = this.clone();
async move {
let location = location?;
let decoded_location = percent_decode_path(location.as_ref());
this.inner
.delete(&decoded_location)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(location)
}
})
.buffered(10)
.boxed()
}
fn list(
@@ -568,28 +568,14 @@ impl ArrowObjectStore for OpendalStore {
})
}
async fn copy(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> {
self.copy_request(from, to, false).await
}
async fn rename(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> {
self.inner
.rename(
&percent_decode_path(from.as_ref()),
&percent_decode_path(to.as_ref()),
)
.await
.map_err(|err| format_object_store_error(err, from.as_ref()))?;
Ok(())
}
async fn copy_if_not_exists(
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> datafusion_object_store::Result<()> {
self.copy_request(from, to, true).await
let if_not_exists = options.mode == CopyMode::Create;
self.copy_request(from, to, if_not_exists).await
}
}
@@ -731,7 +717,9 @@ mod tests {
use bytes::Bytes;
use datafusion_object_store::path::Path;
use datafusion_object_store::{ObjectStore as ArrowObjectStore, WriteMultipart};
use datafusion_object_store::{
ObjectStore as ArrowObjectStore, ObjectStoreExt, WriteMultipart,
};
use opendal::{Operator, services};
use rand::{Rng, RngCore};
@@ -1003,7 +991,7 @@ mod tests {
}
#[tokio::test]
async fn test_get_range_no_stat() {
async fn test_get_ranges_no_stat() {
use std::sync::atomic::{AtomicUsize, Ordering};
// Create a stat counter and operator with tracking layer
@@ -1022,13 +1010,17 @@ mod tests {
// Reset counter after put
stat_count.store(0, Ordering::SeqCst);
// Test 1: get_range should NOT call stat()
let ret = store.get_range(&location, 0..5).await.unwrap();
assert_eq!(Bytes::from_static(b"Hello"), ret);
// Test 1: get_ranges should NOT call stat()
let range = 0..5;
let ret = store
.get_ranges(&location, std::slice::from_ref(&range))
.await
.unwrap();
assert_eq!(vec![Bytes::from_static(b"Hello")], ret);
assert_eq!(
stat_count.load(Ordering::SeqCst),
0,
"get_range should not call stat()"
"get_ranges should not call stat()"
);
// Reset counter

View File

@@ -16,10 +16,10 @@ use std::env;
use std::sync::Arc;
use anyhow::Result;
use arrow_object_store::path::Path;
use arrow_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt};
use bytes::Bytes;
use common_telemetry::info;
use datafusion_object_store::path::Path;
use datafusion_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt};
use futures::TryStreamExt;
use object_store::ObjectStore;
use object_store::services::{Fs, S3};

File diff suppressed because it is too large Load Diff

View File

@@ -24,7 +24,7 @@ api.workspace = true
arrow.workspace = true
arrow-flight.workspace = true
arrow-ipc.workspace = true
arrow-pg = "0.12"
arrow-pg = "0.13"
arrow-schema.workspace = true
async-trait.workspace = true
auth.workspace = true

View File

@@ -605,7 +605,7 @@ mod tests {
.unwrap();
let write_props = WriterProperties::builder()
.set_max_row_group_size(10)
.set_max_row_group_row_count(Some(10))
.build();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(write_props)).unwrap();