chore(deps): bump arrow and parquet to 36.0.0, and datafusion to the latest (#1282)

* chore: update arrow, parquet to 36.0 and datafusion

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update deps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: LFC <bayinamine@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
Ruihang Xia
2023-03-30 16:24:10 +08:00
committed by GitHub
parent 192fa0caa5
commit b5e5f8e555
21 changed files with 441 additions and 337 deletions

596
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -51,22 +51,22 @@ edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
arrow = { version = "34.0" }
arrow-array = "34.0"
arrow-flight = "34.0"
arrow-schema = { version = "34.0", features = ["serde"] }
arrow = { version = "36.0" }
arrow-array = "36.0"
arrow-flight = "36.0"
arrow-schema = { version = "36.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "146a949218ec970784974137277cde3b4e547d0a" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "8e125d2ecf242b4f4b81f06839900dbb2037cc2a" }
futures = "0.3"
futures-util = "0.3"
parquet = "34.0"
parquet = "36.0"
paste = "1.0"
prost = "0.11"
rand = "0.8"

View File

@@ -132,7 +132,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> (Vec<Column>, u32) {
values: Some(values),
null_mask: array
.data()
.null_bitmap()
.nulls()
.map(|bitmap| bitmap.buffer().as_slice().to_vec())
.unwrap_or_default(),
datatype: datatype.into(),

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::format_full_table_name;
use datafusion::common::{OwnedTableReference, ResolvedTableReference, TableReference};
use datafusion::common::{ResolvedTableReference, TableReference};
use datafusion::datasource::provider_as_source;
use datafusion::logical_expr::TableSource;
use session::context::QueryContext;
@@ -87,9 +87,8 @@ impl DfTableSourceProvider {
pub async fn resolve_table(
&mut self,
table_ref: OwnedTableReference,
table_ref: TableReference<'_>,
) -> Result<Arc<dyn TableSource>> {
let table_ref = table_ref.as_table_reference();
let table_ref = self.resolve_table_ref(table_ref)?;
let resolved_name = table_ref.to_string();

View File

@@ -581,7 +581,7 @@ pub fn expression_from_df_expr(
| Expr::ScalarSubquery(..)
| Expr::Placeholder { .. }
| Expr::QualifiedWildcard { .. } => todo!(),
Expr::GroupingSet(_) => UnsupportedExprSnafu {
Expr::GroupingSet(_) | Expr::OuterReferenceColumn(_, _) => UnsupportedExprSnafu {
name: expr.to_string(),
}
.fail()?,

View File

@@ -22,9 +22,10 @@ use catalog::CatalogManagerRef;
use common_catalog::format_full_table_name;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::common::{DFField, DFSchema, OwnedTableReference};
use datafusion::common::{DFField, DFSchema};
use datafusion::datasource::DefaultTableSource;
use datafusion::physical_plan::project_schema;
use datafusion::sql::TableReference;
use datafusion_expr::{Filter, LogicalPlan, TableScan};
use prost::Message;
use session::context::QueryContext;
@@ -240,13 +241,13 @@ impl DFLogicalSubstraitConvertor {
.projection
.map(|mask_expr| self.convert_mask_expression(mask_expr));
let table_ref = OwnedTableReference::Full {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table: table_name.clone(),
};
let table_ref = TableReference::full(
catalog_name.clone(),
schema_name.clone(),
table_name.clone(),
);
let adapter = table_provider
.resolve_table(table_ref)
.resolve_table(table_ref.clone())
.await
.with_context(|_| ResolveTableSnafu {
table_name: format_full_table_name(&catalog_name, &schema_name, &table_name),
@@ -272,14 +273,13 @@ impl DFLogicalSubstraitConvertor {
};
// Calculate the projected schema
let qualified = &format_full_table_name(&catalog_name, &schema_name, &table_name);
let projected_schema = Arc::new(
project_schema(&stored_schema, projection.as_ref())
.and_then(|x| {
DFSchema::new_with_metadata(
x.fields()
.iter()
.map(|f| DFField::from_qualified(qualified, f.clone()))
.map(|f| DFField::from_qualified(table_ref.clone(), f.clone()))
.collect(),
x.metadata().clone(),
)
@@ -291,7 +291,7 @@ impl DFLogicalSubstraitConvertor {
// TODO(ruihang): Support limit(fetch)
Ok(LogicalPlan::TableScan(TableScan {
table_name: qualified.to_string(),
table_name: table_ref,
source: adapter,
projection,
projected_schema,
@@ -620,10 +620,13 @@ mod test {
let projected_schema =
Arc::new(DFSchema::new_with_metadata(projected_fields, Default::default()).unwrap());
let table_name = TableReference::full(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
DEFAULT_TABLE_NAME,
);
let table_scan_plan = LogicalPlan::TableScan(TableScan {
table_name: format!(
"{DEFAULT_CATALOG_NAME}.{DEFAULT_SCHEMA_NAME}.{DEFAULT_TABLE_NAME}",
),
table_name,
source: adapter,
projection: Some(projection),
projected_schema,

View File

@@ -20,7 +20,7 @@ use common_query::physical_plan::SessionContext;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::basic::{Compression, Encoding};
use datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::RecordBatchStream;
use futures::TryStreamExt;
@@ -94,7 +94,7 @@ impl ParquetWriter {
pub async fn flush(&mut self) -> Result<usize> {
let schema = self.stream.as_ref().schema();
let writer_props = WriterProperties::builder()
.set_compression(Compression::ZSTD)
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(self.max_row_group_size)
.build();

View File

@@ -13,13 +13,13 @@
// limitations under the License.
use arrow::array::ArrayData;
use arrow::bitmap::Bitmap;
use arrow::buffer::NullBuffer;
#[derive(Debug, PartialEq)]
enum ValidityKind<'a> {
/// Whether the array slot is valid or not (null).
Slots {
bitmap: &'a Bitmap,
bitmap: &'a NullBuffer,
len: usize,
null_count: usize,
},
@@ -38,7 +38,7 @@ pub struct Validity<'a> {
impl<'a> Validity<'a> {
/// Creates a `Validity` from [`ArrayData`].
pub fn from_array_data(data: &'a ArrayData) -> Validity<'a> {
match data.null_bitmap() {
match data.nulls() {
Some(bitmap) => Validity {
kind: ValidityKind::Slots {
bitmap,
@@ -67,7 +67,7 @@ impl<'a> Validity<'a> {
/// Returns whether `i-th` bit is set.
pub fn is_set(&self, i: usize) -> bool {
match self.kind {
ValidityKind::Slots { bitmap, .. } => bitmap.is_set(i),
ValidityKind::Slots { bitmap, .. } => bitmap.is_valid(i),
ValidityKind::AllValid { len } => i < len,
ValidityKind::AllNull { .. } => false,
}

View File

@@ -30,6 +30,7 @@ use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::Expr;
use datafusion::sql::TableReference;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
@@ -56,12 +57,17 @@ impl EmptyMetric {
let schema = Arc::new(DFSchema::new_with_metadata(
vec![
DFField::new(
None,
None::<TableReference>,
&time_index_column_name,
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
DFField::new(None, &value_column_name, DataType::Float64, true),
DFField::new(
None::<TableReference>,
&value_column_name,
DataType::Float64,
true,
),
],
HashMap::new(),
)?);

View File

@@ -33,6 +33,7 @@ use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Statistics,
};
use datafusion::sql::TableReference;
use futures::{Stream, StreamExt};
use crate::extension_plan::Millisecond;
@@ -106,7 +107,7 @@ impl RangeManipulate {
// process time index column
// the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index)? else {
return Err(datafusion::common::field_not_found(None, time_index, input_schema.as_ref()))
return Err(datafusion::common::field_not_found(None::<TableReference>, time_index, input_schema.as_ref()))
};
let timestamp_range_field = columns[ts_col_index]
.field()
@@ -119,7 +120,7 @@ impl RangeManipulate {
// process value columns
for name in value_columns {
let Some(index) = input_schema.index_of_column_by_name(None, name)? else {
return Err(datafusion::common::field_not_found(None, name, input_schema.as_ref()))
return Err(datafusion::common::field_not_found(None::<TableReference>, name, input_schema.as_ref()))
};
columns[index] = DFField::from(RangeArray::convert_field(columns[index].field()));
}

View File

@@ -30,6 +30,7 @@ use datafusion::logical_expr::{
use datafusion::optimizer::utils;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use promql_parser::label::{MatchOp, Matchers, METRIC_NAME};
use promql_parser::parser::{
@@ -562,15 +563,13 @@ impl PromPlanner {
table_name: &str,
filter: Vec<DfExpr>,
) -> Result<LogicalPlan> {
let table_ref = OwnedTableReference::Bare {
table: table_name.to_string(),
};
let table_ref = OwnedTableReference::bare(table_name.to_string());
let provider = self
.table_provider
.resolve_table(table_ref)
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?;
let result = LogicalPlanBuilder::scan_with_filters(table_name, provider, None, filter)
let result = LogicalPlanBuilder::scan_with_filters(table_ref, provider, None, filter)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
@@ -586,9 +585,7 @@ impl PromPlanner {
.context(TableNameNotFoundSnafu)?;
let table = self
.table_provider
.resolve_table(OwnedTableReference::Bare {
table: table_name.to_string(),
})
.resolve_table(TableReference::bare(&table_name))
.await
.context(CatalogSnafu)?
.as_any()

View File

@@ -14,6 +14,7 @@
//An extended "array" based on [DictionaryArray].
use datafusion::arrow::buffer::NullBuffer;
use datafusion::arrow::datatypes::Field;
use datatypes::arrow::array::{Array, ArrayData, ArrayRef, DictionaryArray, Int64Array};
use datatypes::arrow::datatypes::{DataType, Int64Type};
@@ -124,10 +125,10 @@ impl RangeArray {
.len(key_array.len())
.add_buffer(key_array.data().buffers()[0].clone())
.add_child_data(values.data().clone());
match key_array.data().null_buffer() {
match key_array.data().nulls() {
Some(buffer) if key_array.data().null_count() > 0 => {
data = data
.null_bit_buffer(Some(buffer.clone()))
.nulls(Some(buffer.clone()))
.null_count(key_array.data().null_count());
}
_ => data = data.null_count(0),
@@ -223,6 +224,18 @@ impl Array for RangeArray {
fn into_data(self) -> ArrayData {
self.array.into_data()
}
fn to_data(&self) -> ArrayData {
self.array.to_data()
}
fn slice(&self, offset: usize, length: usize) -> ArrayRef {
self.array.slice(offset, length)
}
fn nulls(&self) -> Option<&NullBuffer> {
self.array.nulls()
}
}
impl std::fmt::Debug for RangeArray {

View File

@@ -93,10 +93,7 @@ impl DatafusionQueryEngine {
let default_catalog = query_ctx.current_catalog();
let default_schema = query_ctx.current_schema();
let table_name = dml
.table_name
.as_table_reference()
.resolve(&default_catalog, &default_schema);
let table_name = dml.table_name.resolve(&default_catalog, &default_schema);
let table = self.find_table(&table_name).await?;
let output = self

View File

@@ -79,7 +79,7 @@ async fn resolve_tables(
for table_name in table_names {
let resolved_name = table_provider
.resolve_table_ref(table_name.as_table_reference())
.resolve_table_ref(table_name.clone())
.context(CatalogSnafu)?;
if let Entry::Vacant(v) = tables.entry(resolved_name.to_string()) {

View File

@@ -18,8 +18,8 @@ use std::sync::Arc;
use common_time::timestamp::{TimeUnit, Timestamp};
use datafusion::optimizer::optimizer::OptimizerRule;
use datafusion::optimizer::OptimizerConfig;
use datafusion_common::tree_node::{TreeNode, TreeNodeRewriter};
use datafusion_common::{DFSchemaRef, DataFusionError, Result, ScalarValue};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{
Between, BinaryExpr, Expr, ExprSchemable, Filter, LogicalPlan, Operator, TableScan,
};
@@ -200,7 +200,9 @@ impl<'a> TypeConverter<'a> {
}
}
impl<'a> ExprRewriter for TypeConverter<'a> {
impl<'a> TreeNodeRewriter for TypeConverter<'a> {
type N = Expr;
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
let new_expr = match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
@@ -299,6 +301,7 @@ mod tests {
use std::collections::HashMap;
use datafusion_common::{Column, DFField, DFSchema};
use datafusion_sql::TableReference;
use super::*;
@@ -358,7 +361,7 @@ mod tests {
let schema_ref = Arc::new(
DFSchema::new_with_metadata(
vec![DFField::new(
None,
None::<TableReference>,
"ts",
DataType::Timestamp(ArrowTimeUnit::Millisecond, None),
true,
@@ -390,7 +393,12 @@ mod tests {
let col_name = "is_valid";
let schema_ref = Arc::new(
DFSchema::new_with_metadata(
vec![DFField::new(None, col_name, DataType::Boolean, false)],
vec![DFField::new(
None::<TableReference>,
col_name,
DataType::Boolean,
false,
)],
HashMap::new(),
)
.unwrap(),

View File

@@ -61,7 +61,7 @@ impl DfLogicalPlanner {
)
.await?;
let config_options = self.session_state.config().config_options();
let config_options = self.session_state.config().options();
let parser_options = ParserOptions {
enable_ident_normalization: config_options.sql_parser.enable_ident_normalization,
parse_float_as_decimal: config_options.sql_parser.parse_float_as_decimal,

View File

@@ -39,7 +39,7 @@ use futures_util::{Stream, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{Compression, Encoding};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
@@ -91,7 +91,7 @@ impl<'a> ParquetWriter<'a> {
let schema = store_schema.arrow_schema().clone();
let writer_props = WriterProperties::builder()
.set_compression(Compression::ZSTD)
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(self.max_row_group_size)
.set_key_value_metadata(extra_meta.map(|map| {

View File

@@ -194,7 +194,7 @@ SELECT a-10 AS k FROM test UNION SELECT a-10 AS l FROM test ORDER BY k;
SELECT a-10 AS k FROM test UNION SELECT a-10 AS l FROM test ORDER BY l;
Error: 3000(PlanQuery), No field named 'l'. Valid fields are 'k'.
Error: 3000(PlanQuery), No field named "l". Valid fields are "k".
SELECT a-10 AS k FROM test UNION SELECT a-10 AS l FROM test ORDER BY 1-k;

View File

@@ -36,7 +36,7 @@ SELECT a AS k, b FROM test UNION SELECT a AS k, b FROM test ORDER BY k;
SELECT a % 2, b FROM test UNION SELECT b, a % 2 AS k ORDER BY a % 2;
Error: 3000(PlanQuery), No field named 'b'.
Error: 3000(PlanQuery), No field named "b".
SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY a % 2;

View File

@@ -24,15 +24,15 @@ select 4 + 0.5;
select "a";
Error: 3000(PlanQuery), No field named 'a'.
Error: 3000(PlanQuery), No field named "a".
select "A";
Error: 3000(PlanQuery), No field named 'A'.
Error: 3000(PlanQuery), No field named "A".
select * where "a" = "A";
Error: 3000(PlanQuery), No field named 'a'.
Error: 3000(PlanQuery), No field named "a".
select TO_UNIXTIME('2023-03-01T06:35:02Z');

View File

@@ -16,27 +16,27 @@ SELECT a FROM test LIMIT 1;
SELECT a FROM test LIMIT 1.25;
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT a FROM test LIMIT 2-1;
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT a FROM test LIMIT a;
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT a FROM test LIMIT a+1;
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT a FROM test LIMIT SUM(42);
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT a FROM test LIMIT row_number() OVER ();
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
CREATE TABLE test2 (a STRING, ts BIGINT TIME INDEX);
@@ -56,7 +56,7 @@ SELECT * FROM test2 LIMIT 3;
select 1 limit date '1992-01-01';
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
CREATE TABLE integers(i BIGINT TIME INDEX);
@@ -89,7 +89,7 @@ SELECT * FROM integers LIMIT 4;
SELECT * FROM integers as int LIMIT (SELECT MIN(integers.i) FROM integers);
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT * FROM integers as int OFFSET (SELECT MIN(integers.i) FROM integers);
@@ -101,23 +101,23 @@ Error: 3000(PlanQuery), Error during planning: Unexpected expression in OFFSET c
SELECT * FROM integers as int LIMIT (SELECT max(integers.i) FROM integers where i > 5);
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT * FROM integers as int LIMIT (SELECT max(integers.i) FROM integers where i > 5);
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT * FROM integers as int LIMIT (SELECT NULL);
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT * FROM integers as int LIMIT (SELECT -1);
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
SELECT * FROM integers as int LIMIT (SELECT 'ab');
Error: 3000(PlanQuery), Error during planning: Unexpected expression for LIMIT clause
Error: 3000(PlanQuery), Error during planning: LIMIT must not be negative
DROP TABLE integers;