feat: bump datafusion to 53

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-13 10:53:42 +08:00
parent 0d90f7407c
commit 2cc1c72e4f
19 changed files with 582 additions and 799 deletions

577
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -100,17 +100,16 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "57.3", features = ["prettyprint"] }
arrow-array = { version = "57.3", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "57.3"
arrow-cast = "57.3"
arrow-flight = "57.3"
arrow-ipc = { version = "57.3", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "57.3", features = ["serde"] }
arrow = { version = "58.3", features = ["prettyprint"] }
arrow-array = { version = "58.3", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "58.3"
arrow-cast = "58.3"
arrow-flight = "58.3"
arrow-ipc = { version = "58.3", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "58.3", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
arrow_object_store = { package = "object_store", version = "0.13.2" }
axum = "0.8"
axum-extra = "0.10"
axum-macros = "0.5"
@@ -128,21 +127,21 @@ const_format = "0.2"
criterion = "0.7"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = "=52.1"
datafusion-common = "=52.1"
datafusion-datasource = "=52.1"
datafusion-expr = "=52.1"
datafusion-functions = "=52.1"
datafusion-functions-aggregate-common = "=52.1"
datafusion-functions-window-common = "=52.1"
datafusion-optimizer = "=52.1"
datafusion-orc = "0.7"
datafusion-pg-catalog = "0.15.1"
datafusion-physical-expr = "=52.1"
datafusion-physical-plan = "=52.1"
datafusion-sql = "=52.1"
datafusion-substrait = "=52.1"
datafusion_object_store = { package = "object_store", version = "0.12.5" }
datafusion = "=53.1.0"
datafusion-common = "=53.1.0"
datafusion-datasource = "=53.1.0"
datafusion-expr = "=53.1.0"
datafusion-functions = "=53.1.0"
datafusion-functions-aggregate-common = "=53.1.0"
datafusion-functions-window-common = "=53.1.0"
datafusion-optimizer = "=53.1.0"
datafusion-orc = { git = "https://github.com/datafusion-contrib/datafusion-orc.git", rev = "73a7036a68dcc277b76d4e29d1d9a4a1fffae70c" }
datafusion-pg-catalog = "0.16"
datafusion-physical-expr = "=53.1.0"
datafusion-physical-plan = "=53.1.0"
datafusion-sql = "=53.1.0"
datafusion-substrait = "=53.1.0"
datafusion_object_store = { package = "object_store", version = "0.13.2" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
@@ -191,7 +190,7 @@ otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "5
"server",
] }
parking_lot = "0.12"
parquet = { version = "57.3", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "58.3", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
@@ -337,19 +336,38 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-catalog = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-catalog-listing = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-common-runtime = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource-arrow = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource-csv = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource-json = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-datasource-parquet = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-doc = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-execution = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-aggregate = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-nested = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-table = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-window = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-functions-window-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-macros = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-expr-adapter = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-pruning = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-session = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "9c1ed8d9242408aad0a6d444c7c339bcb62f9be4" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "2aefa08a8d69c96eec2d6d6703598a009bba6e4c" } # on branch v0.61.x
[profile.release]

View File

@@ -28,11 +28,12 @@ common-runtime.workspace = true
common-telemetry.workspace = true
datafusion.workspace = true
datafusion-datasource.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
orc-rust = { version = "0.7", default-features = false, features = ["async"] }
orc-rust = { version = "0.8", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true
regex.workspace = true
@@ -45,4 +46,3 @@ url.workspace = true
[dev-dependencies]
common-test-util.workspace = true
datafusion-orc.workspace = true

View File

@@ -15,6 +15,7 @@
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
pub use datafusion_orc::OrcSource;
use futures::FutureExt;
use futures::future::BoxFuture;
use object_store::ObjectStore;

View File

@@ -176,11 +176,14 @@ impl AsyncFileReader for LazyParquetFileReader {
.map_err(|e| ParquetError::External(Box::new(e)))?;
let metadata_opts = options.map(|o| o.metadata_options().clone());
let column_index_policy =
options.map_or(PageIndexPolicy::Skip, |o| o.column_index_policy());
let offset_index_policy =
options.map_or(PageIndexPolicy::Skip, |o| o.offset_index_policy());
let metadata_reader = ParquetMetaDataReader::new()
.with_metadata_options(metadata_opts)
.with_page_index_policy(PageIndexPolicy::from(
options.is_some_and(|o| o.page_index()),
))
.with_column_index_policy(column_index_policy)
.with_offset_index_policy(offset_index_policy)
.with_prefetch_hint(self.metadata_size_hint);
let metadata = metadata_reader

View File

@@ -26,11 +26,11 @@ use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::prelude::SessionContext;
use datafusion_orc::OrcSource;
use futures::StreamExt;
use object_store::ObjectStore;
use super::FORMAT_TYPE;
use crate::file_format::orc::OrcSource;
use crate::file_format::parquet::DefaultParquetFileReaderFactory;
use crate::file_format::{FileFormat, Format, OrcFormat};
use crate::test_util::{basic_schema_with_time_format, scan_config, test_basic_schema, test_store};

View File

@@ -26,7 +26,6 @@ common-test-util = { workspace = true, optional = true }
common-time.workspace = true
datafusion.workspace = true
datafusion-expr.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
object-store.workspace = true

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use common_datasource::file_format::Format;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::orc::OrcSource;
use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
use datafusion::common::ToDFSchema;
use datafusion::config::CsvOptions;
@@ -34,7 +35,6 @@ use datafusion::physical_plan::{
use datafusion::prelude::SessionContext;
use datafusion_expr::expr::Expr;
use datafusion_expr::utils::conjunction;
use datafusion_orc::OrcSource;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;

View File

@@ -127,7 +127,10 @@ pub(crate) fn assert_parquet_metadata_equal(x: Arc<ParquetMetaData>, y: Arc<Parq
.statistics()
.cloned()
.map(unset_min_max_backwards_compatible_flag);
let mut col_builder = col.into_builder().clear_statistics();
let mut col_builder = col
.into_builder()
.clear_statistics()
.clear_page_encoding_stats();
if let Some(stats) = stats {
col_builder = col_builder.set_statistics(stats);
}

View File

@@ -1202,7 +1202,7 @@ impl BulkPartEncoder {
WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_write_batch_size(row_group_size)
.set_max_row_group_size(row_group_size)
.set_max_row_group_row_count(Some(row_group_size))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_column_index_truncate_length(None)
.set_statistics_truncate_length(None)

View File

@@ -756,7 +756,7 @@ impl<'a> DataPartEncoder<'a> {
fn writer_props(self) -> WriterProperties {
let mut builder = WriterProperties::builder();
if let Some(row_group_size) = self.row_group_size {
builder = builder.set_max_row_group_size(row_group_size)
builder = builder.set_max_row_group_row_count(Some(row_group_size))
}
let ts_col = ColumnPath::new(vec![self.timestamp_column_name]);

View File

@@ -590,7 +590,7 @@ mod tests {
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(write_opts.row_group_size);
.set_max_row_group_row_count(Some(write_opts.row_group_size));
let writer_props = props_builder.build();

View File

@@ -425,7 +425,7 @@ where
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(opts.row_group_size)
.set_max_row_group_row_count(Some(opts.row_group_size))
.set_column_index_truncate_length(None)
.set_statistics_truncate_length(None);

View File

@@ -43,7 +43,6 @@ uuid.workspace = true
[dev-dependencies]
anyhow = "1.0"
arrow_object_store.workspace = true
common-telemetry.workspace = true
common-test-util.workspace = true
object_store_opendal.workspace = true

View File

@@ -23,13 +23,13 @@ use async_trait::async_trait;
use bytes::Bytes;
use datafusion_object_store::path::Path;
use datafusion_object_store::{
Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode, PutMultipartOptions,
PutOptions, PutPayload, PutResult, UploadPart,
Attribute, Attributes, CopyMode, CopyOptions, GetOptions, GetRange, GetResult,
GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore,
PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, UploadPart,
};
use futures::stream::BoxStream;
use futures::{FutureExt, StreamExt, TryStreamExt};
use opendal::options::CopyOptions;
use opendal::options::CopyOptions as OpendalCopyOptions;
use opendal::raw::percent_decode_path;
use opendal::{Buffer, Operator, OperatorInfo, Writer};
use tokio::sync::{Mutex, oneshot};
@@ -107,7 +107,7 @@ impl OpendalStore {
to: &Path,
if_not_exists: bool,
) -> datafusion_object_store::Result<()> {
let mut copy_options = CopyOptions::default();
let mut copy_options = OpendalCopyOptions::default();
if if_not_exists {
copy_options.if_not_exists = true;
}
@@ -212,22 +212,6 @@ impl ArrowObjectStore for OpendalStore {
Ok(PutResult { e_tag, version })
}
async fn put_multipart(
&self,
location: &Path,
) -> datafusion_object_store::Result<Box<dyn MultipartUpload>> {
let decoded_location = percent_decode_path(location.as_ref());
let writer = self
.inner
.writer_with(&decoded_location)
.concurrent(8)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let upload = OpendalMultipartUpload::new(writer, location.clone());
Ok(Box::new(upload))
}
async fn put_multipart_opts(
&self,
location: &Path,
@@ -403,33 +387,49 @@ impl ArrowObjectStore for OpendalStore {
})
}
async fn get_range(
async fn get_ranges(
&self,
location: &Path,
range: Range<u64>,
) -> datafusion_object_store::Result<Bytes> {
ranges: &[Range<u64>],
) -> datafusion_object_store::Result<Vec<Bytes>> {
if ranges.is_empty() {
return Ok(Vec::new());
}
let raw_location = percent_decode_path(location.as_ref());
let reader = self
.inner
.reader_with(&raw_location)
.reader_with(raw_location.as_ref())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
let buffers = reader
.fetch(ranges.to_vec())
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
reader
.read(range.start..range.end)
.await
.map(|buf| buf.to_bytes())
.map_err(|err| format_object_store_error(err, location.as_ref()))
Ok(buffers.into_iter().map(|buf| buf.to_bytes()).collect())
}
async fn delete(&self, location: &Path) -> datafusion_object_store::Result<()> {
let decoded_location = percent_decode_path(location.as_ref());
self.inner
.delete(&decoded_location)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(())
fn delete_stream(
&self,
locations: BoxStream<'static, datafusion_object_store::Result<Path>>,
) -> BoxStream<'static, datafusion_object_store::Result<Path>> {
let this = self.clone();
locations
.map(move |location| {
let this = this.clone();
async move {
let location = location?;
let decoded_location = percent_decode_path(location.as_ref());
this.inner
.delete(&decoded_location)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(location)
}
})
.buffered(10)
.boxed()
}
fn list(
@@ -568,28 +568,14 @@ impl ArrowObjectStore for OpendalStore {
})
}
async fn copy(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> {
self.copy_request(from, to, false).await
}
async fn rename(&self, from: &Path, to: &Path) -> datafusion_object_store::Result<()> {
self.inner
.rename(
&percent_decode_path(from.as_ref()),
&percent_decode_path(to.as_ref()),
)
.await
.map_err(|err| format_object_store_error(err, from.as_ref()))?;
Ok(())
}
async fn copy_if_not_exists(
async fn copy_opts(
&self,
from: &Path,
to: &Path,
options: CopyOptions,
) -> datafusion_object_store::Result<()> {
self.copy_request(from, to, true).await
let if_not_exists = options.mode == CopyMode::Create;
self.copy_request(from, to, if_not_exists).await
}
}
@@ -731,7 +717,9 @@ mod tests {
use bytes::Bytes;
use datafusion_object_store::path::Path;
use datafusion_object_store::{ObjectStore as ArrowObjectStore, WriteMultipart};
use datafusion_object_store::{
ObjectStore as ArrowObjectStore, ObjectStoreExt, WriteMultipart,
};
use opendal::{Operator, services};
use rand::{Rng, RngCore};
@@ -1003,7 +991,7 @@ mod tests {
}
#[tokio::test]
async fn test_get_range_no_stat() {
async fn test_get_ranges_no_stat() {
use std::sync::atomic::{AtomicUsize, Ordering};
// Create a stat counter and operator with tracking layer
@@ -1022,13 +1010,17 @@ mod tests {
// Reset counter after put
stat_count.store(0, Ordering::SeqCst);
// Test 1: get_range should NOT call stat()
let ret = store.get_range(&location, 0..5).await.unwrap();
assert_eq!(Bytes::from_static(b"Hello"), ret);
// Test 1: get_ranges should NOT call stat()
let range = 0..5;
let ret = store
.get_ranges(&location, std::slice::from_ref(&range))
.await
.unwrap();
assert_eq!(vec![Bytes::from_static(b"Hello")], ret);
assert_eq!(
stat_count.load(Ordering::SeqCst),
0,
"get_range should not call stat()"
"get_ranges should not call stat()"
);
// Reset counter

View File

@@ -16,10 +16,10 @@ use std::env;
use std::sync::Arc;
use anyhow::Result;
use arrow_object_store::path::Path;
use arrow_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt};
use bytes::Bytes;
use common_telemetry::info;
use datafusion_object_store::path::Path;
use datafusion_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt};
use futures::TryStreamExt;
use object_store::ObjectStore;
use object_store::services::{Fs, S3};

View File

@@ -23,16 +23,11 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use arrow::array::{
ArrayRef, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow::array::{Array, ArrayRef};
use arrow::compute::{concat, concat_batches, take_record_batch};
use arrow_schema::{Schema, SchemaRef};
use arrow_schema::SchemaRef;
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
use common_telemetry::warn;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use datafusion::common::arrow::compute::sort_to_indices;
use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion::execution::{RecordBatchStream, TaskContext};
@@ -42,16 +37,13 @@ use datafusion::physical_plan::filter_pushdown::{
};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, TopK,
TopKDynamicFilters,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, internal_err};
use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Stream, StreamExt};
use futures::Stream;
use itertools::Itertools;
use parking_lot::RwLock;
use snafu::location;
use store_api::region_engine::PartitionRange;
@@ -115,10 +107,7 @@ pub struct PartSortExec {
metrics: ExecutionPlanMetricsSet,
partition_ranges: Vec<Vec<PartitionRange>>,
properties: Arc<PlanProperties>,
/// Filter matching the state of the sort for dynamic filter pushdown.
/// If `limit` is `Some`, this will also be set and a TopK operator may be used.
/// If `limit` is `None`, this will be `None`.
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
}
impl PartSortExec {
@@ -139,9 +128,7 @@ impl PartSortExec {
properties.boundedness,
));
let filter = limit
.is_some()
.then(|| Self::create_filter(expression.expr.clone()));
let dynamic_filter = Self::new_dynamic_filter(&expression, limit);
Ok(Self {
expression,
@@ -150,15 +137,20 @@ impl PartSortExec {
metrics,
partition_ranges,
properties,
filter,
dynamic_filter,
})
}
/// Add or reset `self.filter` to a new `TopKDynamicFilters`.
fn create_filter(expr: Arc<dyn PhysicalExpr>) -> Arc<RwLock<TopKDynamicFilters>> {
Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![expr], lit(true)),
))))
fn new_dynamic_filter(
expression: &PhysicalSortExpr,
limit: Option<usize>,
) -> Option<Arc<DynamicFilterPhysicalExpr>> {
limit.map(|_| {
Arc::new(DynamicFilterPhysicalExpr::new(
vec![expression.expr.clone()],
lit(true),
))
})
}
pub fn to_stream(
@@ -185,7 +177,6 @@ impl PartSortExec {
input_stream,
self.partition_ranges[partition].clone(),
partition,
self.filter.clone(),
)?) as _;
Ok(df_stream)
@@ -276,28 +267,24 @@ impl ExecutionPlan for PartSortExec {
&self,
phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &datafusion::config::ConfigOptions,
config: &datafusion::config::ConfigOptions,
) -> datafusion_common::Result<FilterDescription> {
if !matches!(phase, FilterPushdownPhase::Post) {
return FilterDescription::from_children(parent_filters, &self.children());
}
let mut child = ChildFilterDescription::from_child(&parent_filters, &self.input)?;
if let Some(filter) = &self.filter {
child = child.with_self_filter(filter.read().expr());
if let Some(filter) = &self.dynamic_filter
&& config.optimizer.enable_topk_dynamic_filter_pushdown
{
let filter: Arc<dyn PhysicalExpr> = filter.clone();
child = child.with_self_filter(filter);
}
Ok(FilterDescription::new().with_child(child))
}
fn reset_state(self: Arc<Self>) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
// shared dynamic filter needs to be reset
let new_filter = self
.limit
.is_some()
.then(|| Self::create_filter(self.expression.expr.clone()));
let dynamic_filter = Self::new_dynamic_filter(&self.expression, self.limit);
Ok(Arc::new(Self {
expression: self.expression.clone(),
limit: self.limit,
@@ -305,25 +292,30 @@ impl ExecutionPlan for PartSortExec {
metrics: self.metrics.clone(),
partition_ranges: self.partition_ranges.clone(),
properties: self.properties.clone(),
filter: new_filter,
dynamic_filter,
}))
}
}
enum PartSortBuffer {
All(Vec<DfRecordBatch>),
/// TopK buffer with row count.
///
/// Given this heap only keeps k element, the capacity of this buffer
/// is not accurate, and is only used for empty check.
Top(TopK, usize),
}
enum TopKThreshold {
Null,
Value(i64),
}
impl PartSortBuffer {
pub fn is_empty(&self) -> bool {
match self {
PartSortBuffer::All(v) => v.is_empty(),
PartSortBuffer::Top(_, cnt) => *cnt == 0,
}
}
pub fn num_rows(&self) -> usize {
match self {
PartSortBuffer::All(v) => v.iter().map(|batch| batch.num_rows()).sum(),
}
}
}
@@ -343,16 +335,12 @@ struct PartSortStream {
cur_part_idx: usize,
evaluating_batch: Option<DfRecordBatch>,
metrics: BaselineMetrics,
context: Arc<TaskContext>,
root_metrics: ExecutionPlanMetricsSet,
dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
/// Groups of ranges by primary end: (primary_end, start_idx_inclusive, end_idx_exclusive).
/// Ranges in the same group must be processed together before outputting results.
range_groups: Vec<(Timestamp, usize, usize)>,
/// Current group being processed (index into range_groups).
cur_group_idx: usize,
/// Dynamic Filter for all TopK instance, notice the `PartSortExec`/`PartSortStream`/`TopK` must share the same filter
/// so that updates from each `TopK` can be seen by others(and by the table scan operator).
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
}
impl PartSortStream {
@@ -363,33 +351,8 @@ impl PartSortStream {
input: DfSendableRecordBatchStream,
partition_ranges: Vec<PartitionRange>,
partition: usize,
filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
) -> datafusion_common::Result<Self> {
let buffer = if let Some(limit) = limit {
let Some(filter) = filter.clone() else {
return internal_err!(
"TopKDynamicFilters must be provided when limit is set at {}",
snafu::location!()
);
};
PartSortBuffer::Top(
TopK::try_new(
partition,
sort.schema().clone(),
vec![],
[sort.expression.clone()].into(),
limit,
context.session_config().batch_size(),
context.runtime_env(),
&sort.metrics,
filter.clone(),
)?,
0,
)
} else {
PartSortBuffer::All(Vec::new())
};
let buffer = PartSortBuffer::All(Vec::new());
// Compute range groups by primary end
let descending = sort.expression.options.descending;
@@ -409,11 +372,9 @@ impl PartSortStream {
cur_part_idx: 0,
evaluating_batch: None,
metrics: BaselineMetrics::new(&sort.metrics, partition),
context,
root_metrics: sort.metrics.clone(),
dynamic_filter: sort.dynamic_filter.clone(),
range_groups,
cur_group_idx: 0,
filter,
})
}
}
@@ -456,6 +417,20 @@ macro_rules! array_check_helper {
}};
}
macro_rules! threshold_helper {
($t:ty, $unit:expr, $arr:expr, $threshold_idx:expr) => {{
let arr = $arr
.as_any()
.downcast_ref::<arrow::array::PrimitiveArray<$t>>()
.unwrap();
if arr.is_null($threshold_idx) {
TopKThreshold::Null
} else {
TopKThreshold::Value(arr.value($threshold_idx))
}
}};
}
impl PartSortStream {
/// check whether the sort column's min/max value is within the current group's effective range.
/// For group-based processing, data from multiple ranges with the same primary end
@@ -535,95 +510,96 @@ impl PartSortStream {
fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
match &mut self.buffer {
PartSortBuffer::All(v) => v.push(batch),
PartSortBuffer::Top(top, cnt) => {
*cnt += batch.num_rows();
top.insert_batch(batch)?;
}
}
Ok(())
}
/// Stop read earlier when current group do not overlap with any of those next group
/// If not overlap, we can stop read further input as current top k is final
/// Use dynamic filter to evaluate the next group's primary end
fn can_stop_early(&mut self, schema: &Arc<Schema>) -> datafusion_common::Result<bool> {
let topk_cnt = match &self.buffer {
PartSortBuffer::Top(_, cnt) => *cnt,
_ => return Ok(false),
fn topk_threshold(
&self,
sort_data_type: &arrow_schema::DataType,
) -> datafusion_common::Result<Option<TopKThreshold>> {
let Some(limit) = self.limit else {
return Ok(None);
};
// not fulfill topk yet
if Some(topk_cnt) < self.limit {
if self.buffer.num_rows() < limit {
return Ok(None);
}
let PartSortBuffer::All(buffer) = &self.buffer;
let mut sort_columns = Vec::with_capacity(buffer.len());
let mut opt = None;
for batch in buffer {
let sort_column = self.expression.evaluate_to_sort_column(batch)?;
opt = opt.or(sort_column.options);
sort_columns.push(sort_column.values);
}
let sort_column =
concat(&sort_columns.iter().map(|a| a.as_ref()).collect_vec()).map_err(|e| {
DataFusionError::ArrowError(
Box::new(e),
Some(format!("Fail to concat sort columns at {}", location!())),
)
})?;
let indices = sort_to_indices(&sort_column, opt, Some(limit)).map_err(|e| {
DataFusionError::ArrowError(
Box::new(e),
Some(format!("Fail to sort to indices at {}", location!())),
)
})?;
if indices.len() < limit {
return Ok(None);
}
let threshold_idx = indices.value(indices.len() - 1) as usize;
let threshold = downcast_ts_array!(
sort_data_type => (threshold_helper, sort_column, threshold_idx),
_ => internal_err!(
"Unsupported data type for sort column: {:?}",
sort_data_type
)?,
);
Ok(Some(threshold))
}
/// Returns true when all rows in the next group are guaranteed to be worse
/// than the current top-k threshold.
fn can_stop_before_group(
&self,
group_idx: usize,
sort_data_type: &arrow_schema::DataType,
) -> datafusion_common::Result<bool> {
if group_idx >= self.range_groups.len() {
return Ok(false);
}
let next_group_primary_end = if self.cur_group_idx + 1 < self.range_groups.len() {
self.range_groups[self.cur_group_idx + 1].0
} else {
// no next group
let Some(threshold) = self.topk_threshold(sort_data_type)? else {
return Ok(false);
};
// dyn filter is updated based on the last value of topk heap("threshold")
// it's a max-heap for a ASC TopK operator
// so can use dyn filter to prune data range
let filter = self
.filter
.as_ref()
.expect("TopKDynamicFilters must be provided when limit is set");
let filter = filter.read().expr().current()?;
let mut ts_index = None;
// invariant: the filter must contain only the same column expr that's time index column
let filter = filter
.transform_down(|c| {
// rewrite all column's index as 0
if let Some(column) = c.as_any().downcast_ref::<Column>() {
ts_index = Some(column.index());
Ok(Transformed::yes(
Arc::new(Column::new(column.name(), 0)) as Arc<dyn PhysicalExpr>
))
let (_, start_idx, _) = self.range_groups[group_idx];
let next_range =
project_partition_range_for_sort(self.partition_ranges[start_idx], sort_data_type)?;
let descending = self.expression.options.descending;
let next_primary = get_primary_end(&next_range, descending).value();
let can_stop = match threshold {
TopKThreshold::Null => self.expression.options.nulls_first,
TopKThreshold::Value(value) => {
if descending {
value >= next_primary
} else {
Ok(Transformed::no(c))
value < next_primary
}
})?
.data;
let Some(ts_index) = ts_index else {
return Ok(false); // dyn filter is still true, cannot decide, continue read
}
};
let field = if schema.fields().len() <= ts_index {
warn!(
"Schema mismatch when evaluating dynamic filter for PartSortExec at {}, schema: {:?}, ts_index: {}",
self.partition, schema, ts_index
);
return Ok(false); // schema mismatch, cannot decide, continue read
} else {
schema.field(ts_index)
};
let schema = Arc::new(Schema::new(vec![field.clone()]));
// convert next_group_primary_end to array&filter, if eval to false, means no overlap, can stop early
let primary_end_array = match next_group_primary_end.unit() {
TimeUnit::Second => Arc::new(TimestampSecondArray::from(vec![
next_group_primary_end.value(),
])) as ArrayRef,
TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(vec![
next_group_primary_end.value(),
])) as ArrayRef,
TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(vec![
next_group_primary_end.value(),
])) as ArrayRef,
TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(vec![
next_group_primary_end.value(),
])) as ArrayRef,
};
let primary_end_batch = DfRecordBatch::try_new(schema, vec![primary_end_array])?;
let res = filter.evaluate(&primary_end_batch)?;
let array = res.into_array(primary_end_batch.num_rows())?;
let filter = array.as_boolean().clone();
let overlap = filter.iter().next().flatten();
if let Some(false) = overlap {
Ok(true)
} else {
Ok(false)
}
Ok(can_stop)
}
/// Check if the given partition index is within the current group.
@@ -685,17 +661,13 @@ impl PartSortStream {
fn sort_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
match &mut self.buffer {
PartSortBuffer::All(_) => self.sort_all_buffer(),
PartSortBuffer::Top(_, _) => self.sort_top_buffer(),
}
}
/// Internal method for sorting `All` buffer (without limit).
fn sort_all_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
let PartSortBuffer::All(buffer) =
std::mem::replace(&mut self.buffer, PartSortBuffer::All(Vec::new()))
else {
unreachable!("buffer type is checked before and should be All variant")
};
std::mem::replace(&mut self.buffer, PartSortBuffer::All(Vec::new()));
if buffer.is_empty() {
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
@@ -726,23 +698,25 @@ impl PartSortStream {
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
}
self.check_in_range(
&sort_column,
(
indices.value(0) as usize,
indices.value(indices.len() - 1) as usize,
),
)
.inspect_err(|_e| {
#[cfg(debug_assertions)]
common_telemetry::error!(
"Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}",
self.partition,
self.cur_part_idx,
sort_column.len(),
_e
);
})?;
if self.limit.is_none() {
self.check_in_range(
&sort_column,
(
indices.value(0) as usize,
indices.value(indices.len() - 1) as usize,
),
)
.inspect_err(|_e| {
#[cfg(debug_assertions)]
common_telemetry::error!(
"Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}",
self.partition,
self.cur_part_idx,
sort_column.len(),
_e
);
})?;
}
// reserve memory for the concat input and sorted output
let total_mem: usize = buffer.iter().map(|r| r.get_array_memory_size()).sum();
@@ -774,65 +748,6 @@ impl PartSortStream {
Ok(sorted)
}
/// Internal method for sorting `Top` buffer (with limit).
fn sort_top_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
let Some(filter) = self.filter.clone() else {
return internal_err!(
"TopKDynamicFilters must be provided when sorting with limit at {}",
snafu::location!()
);
};
let new_top_buffer = TopK::try_new(
self.partition,
self.schema().clone(),
vec![],
[self.expression.clone()].into(),
self.limit.unwrap(),
self.context.session_config().batch_size(),
self.context.runtime_env(),
&self.root_metrics,
filter,
)?;
let PartSortBuffer::Top(top_k, _) =
std::mem::replace(&mut self.buffer, PartSortBuffer::Top(new_top_buffer, 0))
else {
unreachable!("buffer type is checked before and should be Top variant")
};
let mut result_stream = top_k.emit()?;
let mut placeholder_ctx = std::task::Context::from_waker(futures::task::noop_waker_ref());
let mut results = vec![];
// according to the current implementation of `TopK`, the result stream will always be ready
loop {
match result_stream.poll_next_unpin(&mut placeholder_ctx) {
Poll::Ready(Some(batch)) => {
let batch = batch?;
results.push(batch);
}
Poll::Pending => {
#[cfg(debug_assertions)]
unreachable!("TopK result stream should always be ready")
}
Poll::Ready(None) => {
break;
}
}
}
let concat_batch = concat_batches(&self.schema, &results).map_err(|e| {
DataFusionError::ArrowError(
Box::new(e),
Some(format!(
"Fail to concat top k result record batch when sorting at {}",
location!()
)),
)
})?;
Ok(concat_batch)
}
/// Sorts current buffer and returns `None` when there is nothing to emit.
fn sorted_buffer_if_non_empty(&mut self) -> datafusion_common::Result<Option<DfRecordBatch>> {
if self.buffer.is_empty() {
@@ -847,6 +762,12 @@ impl PartSortStream {
}
}
fn mark_dynamic_filter_complete(&self) {
if let Some(filter) = &self.dynamic_filter {
filter.mark_complete();
}
}
/// Try to split the input batch if it contains data that exceeds the current partition range.
///
/// When the input batch contains data that exceeds the current partition range, this function
@@ -860,14 +781,14 @@ impl PartSortStream {
///
/// Returns `None` if the input batch is empty or fully within the current partition range
/// (or we're still collecting data within the same group), and `Some(batch)` when we've
/// completed a group and have sorted output. When operating in TopK (limit) mode, this
/// completed a group and have sorted output. When operating in limit mode, this
/// function will not emit intermediate batches; it only prepares state for a single final
/// output.
fn split_batch(
&mut self,
batch: DfRecordBatch,
) -> datafusion_common::Result<Option<DfRecordBatch>> {
if matches!(self.buffer, PartSortBuffer::Top(_, _)) {
if self.limit.is_some() {
self.split_batch_topk(batch)?;
return Ok(None);
}
@@ -875,9 +796,9 @@ impl PartSortStream {
self.split_batch_all(batch)
}
/// Specialized splitting logic for TopK (limit) mode.
/// Specialized splitting logic for limit mode.
///
/// We only emit once when the TopK buffer is fulfilled or when input is fully consumed.
/// We only emit once when input is fully consumed.
/// When the buffer is fulfilled and we are about to enter a new group, we stop consuming
/// further ranges.
fn split_batch_topk(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
@@ -917,16 +838,12 @@ impl PartSortStream {
// Check if we're still in the same group
let in_same_group = self.is_in_current_group(self.cur_part_idx);
// When TopK is fulfilled and we are switching to a new group, stop consuming further ranges if possible.
// read from topk heap and determine whether we can stop earlier.
if !in_same_group && self.can_stop_early(&batch.schema())? {
self.input_complete = true;
self.evaluating_batch = None;
return Ok(());
}
// Transition to a new group if needed
if !in_same_group {
let next_group_idx = self.cur_group_idx + 1;
if self.can_stop_before_group(next_group_idx, sort_column.data_type())? {
self.input_complete = true;
return Ok(());
}
self.advance_to_next_group();
}
@@ -1027,8 +944,10 @@ impl PartSortStream {
loop {
if self.input_complete {
if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
self.mark_dynamic_filter_complete();
return Poll::Ready(Some(Ok(sorted_batch)));
}
self.mark_dynamic_filter_complete();
return Poll::Ready(None);
}
@@ -1041,8 +960,10 @@ impl PartSortStream {
if self.cur_part_idx >= self.partition_ranges.len() {
// All partitions processed, discard remaining data
if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
self.mark_dynamic_filter_complete();
return Poll::Ready(Some(Ok(sorted_batch)));
}
self.mark_dynamic_filter_complete();
return Poll::Ready(None);
}
@@ -1107,60 +1028,6 @@ mod test {
use super::*;
use crate::test_util::{MockInputExec, new_ts_array};
#[tokio::test]
async fn test_can_stop_early_with_empty_topk_buffer() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
// Build a minimal PartSortExec and stream, but inject a dynamic filter that
// always evaluates to false so TopK will filter out all rows internally.
let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone()));
let exec = PartSortExec::try_new(
PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)),
options: SortOptions {
descending: true,
..Default::default()
},
},
Some(3),
vec![vec![]],
mock_input.clone(),
)
.unwrap();
let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
DynamicFilterPhysicalExpr::new(vec![], lit(false)),
))));
let input_stream = mock_input
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
let mut stream = PartSortStream::new(
Arc::new(TaskContext::default()),
&exec,
Some(3),
input_stream,
vec![],
0,
Some(filter),
)
.unwrap();
// Push 3 rows so the external counter reaches `limit`, while TopK keeps no rows.
let batch = DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
.unwrap();
stream.push_buffer(batch).unwrap();
// The TopK result buffer is empty, so we cannot determine early-stop.
// Ensure this path returns `Ok(false)` (and, importantly, does not panic).
assert!(!stream.can_stop_early(&schema).unwrap());
}
#[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"]
#[tokio::test]
async fn fuzzy_test() {
@@ -2918,88 +2785,4 @@ mod test {
)
.await;
}
/// First group: [0,20), data: [0, 5, 15]
/// Second group: [10, 30), data: [21, 25, 29]
/// after first group, calling early stop manually, and check if filter is updated
#[tokio::test]
async fn test_early_stop_check_update_dyn_filter() {
let unit = TimeUnit::Millisecond;
let schema = Arc::new(Schema::new(vec![Field::new(
"ts",
DataType::Timestamp(unit, None),
false,
)]));
let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone()));
let exec = PartSortExec::try_new(
PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)),
options: SortOptions {
descending: false,
..Default::default()
},
},
Some(3),
vec![vec![
PartitionRange {
start: Timestamp::new(0, unit.into()),
end: Timestamp::new(20, unit.into()),
num_rows: 3,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, unit.into()),
end: Timestamp::new(30, unit.into()),
num_rows: 3,
identifier: 1,
},
]],
mock_input.clone(),
)
.unwrap();
let filter = exec.filter.clone().unwrap();
let input_stream = mock_input
.execute(0, Arc::new(TaskContext::default()))
.unwrap();
let mut stream = PartSortStream::new(
Arc::new(TaskContext::default()),
&exec,
Some(3),
input_stream,
vec![],
0,
Some(filter.clone()),
)
.unwrap();
// initially, snapshot_generation is 1
assert_eq!(filter.read().expr().snapshot_generation(), 1);
let batch =
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![0, 5, 15])])
.unwrap();
stream.push_buffer(batch).unwrap();
// after pushing first batch, snapshot_generation is updated to 2
assert_eq!(filter.read().expr().snapshot_generation(), 2);
assert!(!stream.can_stop_early(&schema).unwrap());
// still two as not updated
assert_eq!(filter.read().expr().snapshot_generation(), 2);
let _ = stream.sort_top_buffer().unwrap();
let batch =
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21, 25, 29])])
.unwrap();
stream.push_buffer(batch).unwrap();
// still two as not updated
assert_eq!(filter.read().expr().snapshot_generation(), 2);
let new = stream.sort_top_buffer().unwrap();
// still two as not updated
assert_eq!(filter.read().expr().snapshot_generation(), 2);
// dyn filter kick in, and filter out all rows >= 15(the filter is rows<15)
assert_eq!(new.num_rows(), 0)
}
}

View File

@@ -24,7 +24,7 @@ api.workspace = true
arrow.workspace = true
arrow-flight.workspace = true
arrow-ipc.workspace = true
arrow-pg = "0.12"
arrow-pg = "0.13"
arrow-schema.workspace = true
async-trait.workspace = true
auth.workspace = true

View File

@@ -605,7 +605,7 @@ mod tests {
.unwrap();
let write_props = WriterProperties::builder()
.set_max_row_group_size(10)
.set_max_row_group_row_count(Some(10))
.build();
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(write_props)).unwrap();