chore: upgrade DataFusion family

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-01-15 19:28:28 +08:00
parent 8566bf1409
commit 8826f690a2
31 changed files with 574 additions and 766 deletions

878
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -100,13 +100,13 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "57.0", features = ["prettyprint"] }
arrow-array = { version = "57.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "57.0"
arrow-cast = "57.0"
arrow-flight = "57.0"
arrow-ipc = { version = "57.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "57.0", features = ["serde"] }
arrow = { version = "57.2", features = ["prettyprint"] }
arrow-array = { version = "57.2", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "57.2"
arrow-cast = "57.2"
arrow-flight = "57.2"
arrow-ipc = { version = "57.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "57.2", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
@@ -126,19 +126,19 @@ config = "0.13.0"
const_format = "0.2"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = "51.0"
datafusion-common = "51.0"
datafusion-datasource = "51.0"
datafusion-expr = "51.0"
datafusion-functions = "51.0"
datafusion-functions-aggregate-common = "51.0"
datafusion-optimizer = "51.0"
datafusion-orc = { git = "https://github.com/GreptimeTeam/datafusion-orc.git", rev = "35f2e04bf81f2ab7b6f86c0450d6a77b7098d43e" }
datafusion-pg-catalog = "0.13"
datafusion-physical-expr = "51.0"
datafusion-physical-plan = "51.0"
datafusion-sql = "51.0"
datafusion-substrait = "51.0"
datafusion = "52.0"
datafusion-common = "52.0"
datafusion-datasource = "52.0"
datafusion-expr = "52.0"
datafusion-functions = "52.0"
datafusion-functions-aggregate-common = "52.0"
datafusion-optimizer = "52.0"
datafusion-orc = "0.7"
datafusion-pg-catalog = "0.14"
datafusion-physical-expr = "52.0"
datafusion-physical-plan = "52.0"
datafusion-sql = "52.0"
datafusion-substrait = "52.0"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
@@ -185,7 +185,7 @@ otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "5
"server",
] }
parking_lot = "0.12"
parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "57.2", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
@@ -224,7 +224,7 @@ simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { version = "0.59.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlparser = { version = "0.60.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlx = { version = "0.8", default-features = false, features = ["any", "macros", "json", "runtime-tokio-rustls"] }
strum = { version = "0.27", features = ["derive"] }
sysinfo = "0.33"
@@ -323,20 +323,19 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-pg-catalog = { git = "https://github.com/GreptimeTeam/datafusion-postgres.git", rev = "74ac8e2806be6de91ff192b97f64735392539d16" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "d7d95a44889e099e32d78e9bad9bc00598faef28" } # on branch v0.59.x
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "dc4129282abc8a74eb1674690115ec899d4e5741" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e8a3ac44693768965abf7030cd38ebcff53a4d29" } # on branch v0.60.x
[profile.release]
debug = 1

View File

@@ -10,7 +10,7 @@ workspace = true
[dependencies]
arrow.workspace = true
arrow-schema.workspace = true
async-compression = { version = "0.3", features = [
async-compression = { version = "0.4", features = [
"bzip2",
"gzip",
"xz",

View File

@@ -203,8 +203,8 @@ pub enum Error {
error: parquet::errors::ParquetError,
},
#[snafu(display("Failed to build file stream"))]
BuildFileStream {
#[snafu(transparent)]
DataFusion {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
@@ -241,8 +241,9 @@ impl ErrorExt for Error {
| ReadRecordBatch { .. }
| WriteRecordBatch { .. }
| EncodeRecordBatch { .. }
| BuildFileStream { .. }
| OrcReader { .. } => StatusCode::Unexpected,
DataFusion { .. } => StatusCode::Internal,
}
}

View File

@@ -313,16 +313,13 @@ pub async fn file_to_stream(
filename.to_string(),
0,
)]))
.with_projection_indices(projection)
.with_projection_indices(projection)?
.with_file_compression_type(df_compression)
.build();
let store = Arc::new(OpendalStore::new(store.clone()));
let file_opener = file_source
.with_projection(&config)
.create_file_opener(store, &config, 0);
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
.context(error::BuildFileStreamSnafu)?;
let file_opener = file_source.create_file_opener(store, &config, 0)?;
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?;
Ok(Box::pin(stream))
}

View File

@@ -46,7 +46,10 @@ struct Test<'a> {
impl Test<'_> {
async fn run(self, store: &ObjectStore) {
let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
let file_opener = self.file_source.create_file_opener(store, &self.config, 0);
let file_opener = self
.file_source
.create_file_opener(store, &self.config, 0)
.unwrap();
let result = FileStream::new(
&self.config,

View File

@@ -154,11 +154,13 @@ pub async fn setup_stream_to_csv_test(
let config = scan_config(None, origin_path, csv_source.clone());
let size = store.read(origin_path).await.unwrap().len();
let csv_opener = csv_source.create_file_opener(
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
&config,
0,
);
let csv_opener = csv_source
.create_file_opener(
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
&config,
0,
)
.unwrap();
let stream = FileStream::new(&config, 0, csv_opener, &ExecutionPlanMetricsSet::new()).unwrap();
let (tmp_store, dir) = test_tmp_store("test_stream_to_csv");

View File

@@ -428,7 +428,7 @@ impl Accumulator for CountHashAccumulator {
&self.random_state,
&mut self.batch_hashes,
)?;
for hash in hashes.as_slice() {
for hash in hashes {
self.values.insert(*hash);
}
Ok(())

View File

@@ -48,7 +48,7 @@ pub fn rename_logical_plan_columns(
plan.schema().qualified_field_from_column(&old_column)?;
for (qualifier, field) in plan.schema().iter() {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
if qualifier.eq(&qualifier_rename) && field == field_rename {
projection.push(col(Column::from((qualifier, field))).alias(new_name));
}
}

View File

@@ -178,8 +178,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to extract column from filter"))]
ExtractColumnFromFilter {
#[snafu(transparent)]
DataFusion {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
@@ -225,8 +225,9 @@ impl ErrorExt for Error {
| ManifestExists { .. }
| BuildStream { .. }
| ParquetScanPlan { .. }
| UnexpectedEngine { .. }
| ExtractColumnFromFilter { .. } => StatusCode::Unexpected,
| UnexpectedEngine { .. } => StatusCode::Unexpected,
DataFusion { .. } => StatusCode::Internal,
}
}

View File

@@ -37,10 +37,7 @@ use snafu::{GenerateImplicitData, ResultExt, ensure};
use store_api::storage::ScanRequest;
use self::file_stream::ScanPlanConfig;
use crate::error::{
BuildBackendSnafu, ExtractColumnFromFilterSnafu, ProjectSchemaSnafu,
ProjectionOutOfBoundsSnafu, Result,
};
use crate::error::{BuildBackendSnafu, ProjectSchemaSnafu, ProjectionOutOfBoundsSnafu, Result};
use crate::region::FileRegion;
impl FileRegion {
@@ -126,8 +123,7 @@ impl FileRegion {
let mut aux_column_set = HashSet::new();
for scan_filter in scan_filters {
df_logical_expr_utils::expr_to_columns(scan_filter, &mut aux_column_set)
.context(ExtractColumnFromFilterSnafu)?;
df_logical_expr_utils::expr_to_columns(scan_filter, &mut aux_column_set)?;
let all_file_columns = aux_column_set
.iter()

View File

@@ -56,7 +56,7 @@ fn build_record_batch_stream(
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_projection_indices(scan_plan_config.projection.cloned())
.with_projection_indices(scan_plan_config.projection.cloned())?
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
@@ -65,7 +65,7 @@ fn build_record_batch_stream(
scan_plan_config.store.clone(),
));
let file_opener = file_source.create_file_opener(store, &config, 0);
let file_opener = file_source.create_file_opener(store, &config, 0)?;
let stream = FileStream::new(
&config,
0, // partition: hard-code
@@ -146,7 +146,7 @@ fn new_parquet_stream_with_exec_plan(
let file_scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(parquet_source))
.with_file_group(file_group)
.with_projection_indices(projection.cloned())
.with_projection_indices(projection.cloned())?
.with_limit(*limit)
.build();

View File

@@ -21,7 +21,7 @@ use datatypes::arrow::error::ArrowError;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::column::page::{PageIterator, PageReader};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use snafu::ResultExt;
use crate::error;
@@ -103,6 +103,14 @@ impl RowGroups for MemtableRowGroupPageFetcher<'_> {
reader: Some(self.column_page_reader(i)),
}))
}
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(self.base.row_group_metadata()))
}
fn metadata(&self) -> &ParquetMetaData {
self.base.parquet_metadata()
}
}
impl RowGroupReaderContext for BulkIterContextRef {

View File

@@ -205,7 +205,8 @@ impl ParquetFetchMetrics {
}
pub(crate) struct RowGroupBase<'a> {
metadata: &'a RowGroupMetaData,
parquet_metadata: &'a ParquetMetaData,
row_group_idx: usize,
pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
/// Compressed page of each column.
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
@@ -225,7 +226,8 @@ impl<'a> RowGroupBase<'a> {
.map(|x| x[row_group_idx].as_slice());
Self {
metadata,
parquet_metadata: parquet_meta,
row_group_idx,
offset_index,
column_chunks: vec![None; metadata.columns().len()],
row_count: metadata.num_rows() as usize,
@@ -244,7 +246,7 @@ impl<'a> RowGroupBase<'a> {
let ranges = self
.column_chunks
.iter()
.zip(self.metadata.columns())
.zip(self.row_group_metadata().columns())
.enumerate()
.filter(|&(idx, (chunk, _chunk_meta))| chunk.is_none() && projection.leaf_included(idx))
.flat_map(|(idx, (_chunk, chunk_meta))| {
@@ -293,8 +295,12 @@ impl<'a> RowGroupBase<'a> {
chunks.push(chunk_data.next().unwrap());
}
let column = self
.parquet_metadata
.row_group(self.row_group_idx)
.column(idx);
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
length: self.metadata.column(idx).byte_range().1 as usize,
length: column.byte_range().1 as usize,
data: offsets.into_iter().zip(chunks).collect(),
}))
}
@@ -307,7 +313,7 @@ impl<'a> RowGroupBase<'a> {
.enumerate()
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.metadata.column(idx);
let column = self.row_group_metadata().column(idx);
let (start, length) = column.byte_range();
start..(start + length)
})
@@ -333,7 +339,10 @@ impl<'a> RowGroupBase<'a> {
continue;
};
let column = self.metadata.column(idx);
let column = self
.parquet_metadata
.row_group(self.row_group_idx)
.column(idx);
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data,
@@ -360,7 +369,7 @@ impl<'a> RowGroupBase<'a> {
.map(|index| index[col_idx].page_locations.clone());
SerializedPageReader::new(
data.clone(),
self.metadata.column(col_idx),
self.row_group_metadata().column(col_idx),
self.row_count,
page_locations,
)?
@@ -369,6 +378,14 @@ impl<'a> RowGroupBase<'a> {
Ok(page_reader)
}
pub(crate) fn parquet_metadata(&self) -> &ParquetMetaData {
self.parquet_metadata
}
pub(crate) fn row_group_metadata(&self) -> &RowGroupMetaData {
self.parquet_metadata().row_group(self.row_group_idx)
}
}
/// An in-memory collection of column chunks
@@ -599,6 +616,14 @@ impl RowGroups for InMemoryRowGroup<'_> {
reader: Some(Ok(Box::new(page_reader))),
}))
}
fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
Box::new(std::iter::once(self.base.row_group_metadata()))
}
fn metadata(&self) -> &ParquetMetaData {
self.base.parquet_metadata()
}
}
/// An in-memory column chunk

View File

@@ -549,15 +549,10 @@ fn find_primary_keys(
let columns_pk = columns
.iter()
.filter_map(|x| {
if x.options().iter().any(|o| {
matches!(
o.option,
ColumnOption::Unique {
is_primary: true,
..
}
)
}) {
if x.options()
.iter()
.any(|o| matches!(o.option, ColumnOption::PrimaryKey(_)))
{
Some(x.name().value.clone())
} else {
None

View File

@@ -81,7 +81,7 @@ impl EmptyMetric {
) -> DataFusionResult<Self> {
let qualifier = Some(TableReference::bare(""));
let ts_only_schema = build_ts_only_schema(&time_index_column_name);
let mut fields = vec![(qualifier.clone(), Arc::new(ts_only_schema.field(0).clone()))];
let mut fields = vec![(qualifier.clone(), ts_only_schema.field(0).clone())];
if let Some(field_expr) = &field_expr {
let field_data_type = field_expr.get_type(&ts_only_schema)?;
fields.push((

View File

@@ -129,7 +129,7 @@ impl RangeManipulate {
let mut new_columns = Vec::with_capacity(columns.len() + 1);
for i in 0..columns.len() {
let x = input_schema.qualified_field(i);
new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
new_columns.push((x.0.cloned(), x.1.clone()));
}
// process time index column

View File

@@ -94,7 +94,7 @@ impl ScalarCalculate {
let qualifier = table_name.map(TableReference::bare);
let schema = DFSchema::new_with_metadata(
vec![
(qualifier.clone(), Arc::new(ts_field)),
(qualifier.clone(), ts_field),
(qualifier, Arc::new(val_field)),
],
input_schema.metadata().clone(),

View File

@@ -14,9 +14,6 @@
//An extended "array" based on [DictionaryArray].
use std::sync::Arc;
use datafusion::arrow::buffer::NullBuffer;
use datafusion::arrow::datatypes::Field;
use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array};
use datatypes::arrow::datatypes::{DataType, Int64Type};
@@ -225,52 +222,6 @@ impl RangeArray {
}
}
impl Array for RangeArray {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn into_data(self) -> ArrayData {
self.array.into_data()
}
fn to_data(&self) -> ArrayData {
self.array.to_data()
}
fn slice(&self, offset: usize, length: usize) -> ArrayRef {
Arc::new(self.array.slice(offset, length))
}
fn nulls(&self) -> Option<&NullBuffer> {
self.array.nulls()
}
fn data_type(&self) -> &DataType {
self.array.data_type()
}
fn len(&self) -> usize {
self.len()
}
fn is_empty(&self) -> bool {
self.is_empty()
}
fn offset(&self) -> usize {
self.array.offset()
}
fn get_buffer_memory_size(&self) -> usize {
self.array.get_buffer_memory_size()
}
fn get_array_memory_size(&self) -> usize {
self.array.get_array_memory_size()
}
}
impl std::fmt::Debug for RangeArray {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ranges = self

View File

@@ -46,7 +46,7 @@ pub(crate) struct PatchOptimizerContext {
}
impl OptimizerConfig for PatchOptimizerContext {
fn query_execution_start_time(&self) -> DateTime<Utc> {
fn query_execution_start_time(&self) -> Option<DateTime<Utc>> {
self.inner.query_execution_start_time()
}

View File

@@ -279,7 +279,8 @@ impl Categorizer {
Expr::Unnest(_)
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,
| Expr::OuterReferenceColumn(_, _)
| Expr::SetComparison(_) => Commutativity::Unimplemented,
}
}

View File

@@ -17,13 +17,12 @@ use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion_common::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TreeNode};
/// This is [PhysicalOptimizerRule] to remove duplicate physical plans such as two
/// adjoining [CoalesceBatchesExec] or [RepartitionExec]. They won't have any effect
/// adjoining [RepartitionExec]. They won't have any effect
/// if one runs right after another.
///
/// This rule is expected to be run in the final stage of the optimization process.
@@ -52,9 +51,7 @@ impl RemoveDuplicate {
fn do_optimize(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
let result = plan
.transform_down(|plan| {
if plan.as_any().is::<CoalesceBatchesExec>()
|| plan.as_any().is::<RepartitionExec>()
{
if plan.as_any().is::<RepartitionExec>() {
// check child
let child = plan.children()[0].clone();
if child.as_any().type_id() == plan.as_any().type_id() {
@@ -72,49 +69,3 @@ impl RemoveDuplicate {
Ok(result)
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use arrow_schema::Schema;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_physical_expr::Partitioning;
use super::*;
#[test]
fn remove_coalesce_batches() {
let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
let coalesce_batches = Arc::new(CoalesceBatchesExec::new(empty, 1024));
let another_coalesce_batches = Arc::new(CoalesceBatchesExec::new(coalesce_batches, 8192));
let optimized = RemoveDuplicate::do_optimize(another_coalesce_batches).unwrap();
let formatted = displayable(optimized.as_ref()).indent(true).to_string();
let expected = "CoalesceBatchesExec: target_batch_size=8192\
\n EmptyExec\n";
assert_eq!(expected, formatted);
}
#[test]
fn non_continuous_coalesce_batches() {
let empty = Arc::new(EmptyExec::new(Arc::new(Schema::empty())));
let coalesce_batches = Arc::new(CoalesceBatchesExec::new(empty, 1024));
let repartition = Arc::new(
RepartitionExec::try_new(coalesce_batches, Partitioning::UnknownPartitioning(1))
.unwrap(),
);
let another_coalesce_batches = Arc::new(CoalesceBatchesExec::new(repartition, 8192));
let optimized = RemoveDuplicate::do_optimize(another_coalesce_batches).unwrap();
let formatted = displayable(optimized.as_ref()).indent(true).to_string();
let expected = "CoalesceBatchesExec: target_batch_size=8192\
\n RepartitionExec: partitioning=UnknownPartitioning(1), input_partitions=1\
\n CoalesceBatchesExec: target_batch_size=1024\
\n EmptyExec\n";
assert_eq!(expected, formatted);
}
}

View File

@@ -17,7 +17,6 @@ use std::sync::Arc;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::coop::CooperativeExec;
use datafusion::physical_plan::filter::FilterExec;
@@ -80,8 +79,6 @@ impl WindowedSortPhysicalRule {
let preserve_partitioning = sort_exec.preserve_partitioning();
let sort_input = remove_repartition(sort_exec.input().clone())?.data;
let sort_input =
remove_coalesce_batches_exec(sort_input, sort_exec.fetch())?.data;
// Gets scanner info from the input without repartition before filter.
let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else {
@@ -164,7 +161,6 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
let mut time_index = HashSet::new();
let mut alias_map = Vec::new();
let mut tag_columns = None;
let mut is_batch_coalesced = false;
input.transform_up(|plan| {
if plan.as_any().is::<CooperativeExec>() {
@@ -180,16 +176,9 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
partition_ranges = None;
}
if plan.as_any().is::<CoalesceBatchesExec>() {
is_batch_coalesced = true;
}
// only a very limited set of plans can exist between region scan and sort exec
// other plans might make this optimize wrong, so be safe here by limiting it
if !(plan.as_any().is::<ProjectionExec>()
|| plan.as_any().is::<FilterExec>()
|| plan.as_any().is::<CoalesceBatchesExec>())
{
if !(plan.as_any().is::<ProjectionExec>() || plan.as_any().is::<FilterExec>()) {
partition_ranges = None;
}
@@ -218,11 +207,6 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
// Reset time index column.
time_index = HashSet::from([region_scan_exec.time_index()]);
tag_columns = Some(region_scan_exec.tag_columns());
// set distinguish_partition_ranges to true, this is an incorrect workaround
if !is_batch_coalesced {
region_scan_exec.with_distinguish_partition_range(true);
}
}
Ok(Transformed::no(plan))
@@ -260,33 +244,6 @@ fn remove_repartition(
})
}
/// Remove `CoalesceBatchesExec` if the limit is less than the batch size.
///
/// so that if limit is too small we can avoid need to scan for more rows than necessary
fn remove_coalesce_batches_exec(
plan: Arc<dyn ExecutionPlan>,
fetch: Option<usize>,
) -> DataFusionResult<Transformed<Arc<dyn ExecutionPlan>>> {
let Some(fetch) = fetch else {
return Ok(Transformed::no(plan));
};
// Avoid removing multiple coalesce batches
let mut is_done = false;
plan.transform_down(|plan| {
if let Some(coalesce_batches_exec) = plan.as_any().downcast_ref::<CoalesceBatchesExec>() {
let target_batch_size = coalesce_batches_exec.target_batch_size();
if fetch < target_batch_size && !is_done {
is_done = true;
return Ok(Transformed::yes(coalesce_batches_exec.input().clone()));
}
}
Ok(Transformed::no(plan))
})
}
/// Resolves alias of the time index column.
///
/// i.e if a is time index, alias= {a:b, b:c}, then result should be {a, b}(not {a, c}) because projection is not transitive

View File

@@ -414,7 +414,7 @@ impl RangeSelect {
.iter()
.map(|i| {
let f = schema_before_project.qualified_field(*i);
(f.0.cloned(), Arc::new(f.1.clone()))
(f.0.cloned(), f.1.clone())
})
.collect();
Arc::new(DFSchema::new_with_metadata(

View File

@@ -19,7 +19,6 @@ use std::time::Duration;
use arrow_schema::DataType;
use async_recursion::async_recursion;
use catalog::table_source::DfTableSourceProvider;
use chrono::Utc;
use common_time::interval::{MS_PER_DAY, NANOS_PER_MILLI};
use common_time::timestamp::TimeUnit;
use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, Timezone};
@@ -28,7 +27,6 @@ use datafusion::prelude::Column;
use datafusion::scalar::ScalarValue;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::WildcardOptions;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{
@@ -146,8 +144,7 @@ fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) ->
if interval_only && !interval_only_in_expr(expr) {
return Err(dispose_parse_error(Some(expr)));
}
let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
let info = SimplifyContext::default().with_current_time();
let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
match simplify_expr {
Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _), _)

View File

@@ -25,7 +25,10 @@ use datatypes::arrow::datatypes::{DataType as ArrowDataType, IntervalUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, ensure};
use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
use sqlparser::ast::{
ColumnOption, ColumnOptionDef, DataType, Expr, KeyOrIndexDisplay, NullsDistinctOption,
PrimaryKeyConstraint, UniqueConstraint,
};
use sqlparser::dialect::keywords::Keyword;
use sqlparser::keywords::ALL_KEYWORDS;
use sqlparser::parser::IsOptional::Mandatory;
@@ -727,15 +730,25 @@ impl<'a> ParserContext<'a> {
parser.parse_expr().context(SyntaxSnafu)?,
)))
} else if parser.parse_keywords(&[Keyword::PRIMARY, Keyword::KEY]) {
Ok(Some(ColumnOption::Unique {
is_primary: true,
Ok(Some(ColumnOption::PrimaryKey(PrimaryKeyConstraint {
name: None,
index_name: None,
index_type: None,
columns: vec![],
index_options: vec![],
characteristics: None,
}))
})))
} else if parser.parse_keyword(Keyword::UNIQUE) {
Ok(Some(ColumnOption::Unique {
is_primary: false,
Ok(Some(ColumnOption::Unique(UniqueConstraint {
name: None,
index_name: None,
index_type_display: KeyOrIndexDisplay::None,
index_type: None,
columns: vec![],
index_options: vec![],
characteristics: None,
}))
nulls_distinct: NullsDistinctOption::None,
})))
} else if parser.parse_keywords(&[Keyword::TIME, Keyword::INDEX]) {
// Use a DialectSpecific option for time index
Ok(Some(ColumnOption::DialectSpecific(vec![

View File

@@ -23,8 +23,11 @@ use crate::statements::statement::Statement;
/// DELETE statement parser implementation
impl ParserContext<'_> {
pub(crate) fn parse_delete(&mut self) -> Result<Statement> {
let _ = self.parser.next_token();
let spstatement = self.parser.parse_delete().context(error::SyntaxSnafu)?;
let token = self.parser.next_token();
let spstatement = self
.parser
.parse_delete(token)
.context(error::SyntaxSnafu)?;
match spstatement {
SpStatement::Delete { .. } => {

View File

@@ -23,8 +23,11 @@ use crate::statements::statement::Statement;
/// INSERT/REPLACE statement parser implementation
impl ParserContext<'_> {
pub(crate) fn parse_insert(&mut self) -> Result<Statement> {
let _ = self.parser.next_token();
let spstatement = self.parser.parse_insert().context(error::SyntaxSnafu)?;
let token = self.parser.next_token();
let spstatement = self
.parser
.parse_insert(token)
.context(error::SyntaxSnafu)?;
match spstatement {
SpStatement::Insert { .. } => {
@@ -38,8 +41,11 @@ impl ParserContext<'_> {
}
pub(crate) fn parse_replace(&mut self) -> Result<Statement> {
let _ = self.parser.next_token();
let spstatement = self.parser.parse_insert().context(error::SyntaxSnafu)?;
let token = self.parser.next_token();
let spstatement = self
.parser
.parse_insert(token)
.context(error::SyntaxSnafu)?;
match spstatement {
SpStatement::Insert(mut insert_stmt) => {

View File

@@ -14,7 +14,6 @@
use std::sync::Arc;
use chrono::Utc;
use datafusion::config::ConfigOptions;
use datafusion::error::Result as DfResult;
use datafusion::execution::SessionStateBuilder;
@@ -22,7 +21,6 @@ use datafusion::execution::context::SessionState;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor};
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource, WindowUDF};
use datafusion_sql::TableReference;
@@ -124,10 +122,7 @@ pub fn parser_expr_to_scalar_value_literal(
}
// 2. simplify logical expr
let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
let info =
SimplifyContext::new(&execution_props).with_schema(Arc::new(empty_df_schema.clone()));
let info = SimplifyContext::default().with_current_time();
let simplifier = ExprSimplifier::new(info);
// Coerce the logical expression so simplifier can handle it correctly. This is necessary for const eval with possible type mismatch. i.e.: `now() - now() + '15s'::interval` which is `TimestampNanosecond - TimestampNanosecond + IntervalMonthDayNano`.
@@ -274,8 +269,6 @@ pub fn convert_month_day_nano_to_duration(
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chrono::DateTime;
use datafusion::functions::datetime::expr_fn::now;
use datafusion_expr::lit;
@@ -324,9 +317,7 @@ mod tests {
),
];
let execution_props = ExecutionProps::new().with_query_execution_start_time(now_time);
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
let info = SimplifyContext::default().with_query_execution_start_time(Some(now_time));
let simplifier = ExprSimplifier::new(info);
for (expr, expected) in testcases {
let expr_name = expr.schema_name().to_string();

View File

@@ -114,6 +114,7 @@ impl ParserContext<'_> {
match cte.content {
CteContent::Sql(body) => sql_cte_tables.push(Cte {
alias: TableAlias {
explicit: false,
name: cte.name,
columns: cte
.columns

View File

@@ -91,10 +91,7 @@ pub fn has_primary_key_option(column_def: &ColumnDef) -> bool {
column_def
.options
.iter()
.any(|options| match options.option {
ColumnOption::Unique { is_primary, .. } => is_primary,
_ => false,
})
.any(|options| matches!(options.option, ColumnOption::PrimaryKey(..)))
}
/// Create a `ColumnSchema` from `Column`.
@@ -198,15 +195,10 @@ pub fn sql_column_def_to_grpc_column_def(
.context(ConvertToGrpcDataTypeSnafu)?
.to_parts();
let is_primary_key = col.options.iter().any(|o| {
matches!(
o.option,
ColumnOption::Unique {
is_primary: true,
..
}
)
});
let is_primary_key = col
.options
.iter()
.any(|o| matches!(o.option, ColumnOption::PrimaryKey(..)));
let semantic_type = if is_primary_key {
SemanticType::Tag
@@ -375,7 +367,7 @@ mod tests {
use datatypes::schema::{
COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, FulltextAnalyzer,
};
use sqlparser::ast::{ColumnOptionDef, Expr};
use sqlparser::ast::{ColumnOptionDef, Expr, PrimaryKeyConstraint};
use super::*;
use crate::ast::TimezoneInfo;
@@ -505,10 +497,14 @@ mod tests {
data_type: SqlDataType::Double(ExactNumberInfo::None),
options: vec![ColumnOptionDef {
name: None,
option: ColumnOption::Unique {
is_primary: true,
option: ColumnOption::PrimaryKey(PrimaryKeyConstraint {
name: None,
index_name: None,
index_type: None,
columns: vec![],
index_options: vec![],
characteristics: None,
},
}),
}],
};
@@ -583,10 +579,14 @@ mod tests {
data_type: SqlDataType::Double(ExactNumberInfo::None),
options: vec![ColumnOptionDef {
name: None,
option: ColumnOption::Unique {
is_primary: true,
option: ColumnOption::PrimaryKey(PrimaryKeyConstraint {
name: None,
index_name: None,
index_type: None,
columns: vec![],
index_options: vec![],
characteristics: None,
},
}),
}],
};
assert!(has_primary_key_option(&column_def));