feat!: Bump datafusion, prost, hyper, tonic, tower, axum (#5417)

* change dep

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: adapt to arrow's interval array

* chore: fix compile errors in datatypes crate

* chore: fix api crate compiler errors

* chore: fix compiler errors in common-grpc

* chore: fix common-datasource errors

* chore: fix deprecated code in common-datasource

* fix promql and physical plan related

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: upgrading network deps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* block on updating `sqlparser`

* upgrade sqlparser

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adapt new df's trait requirements

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: fix compiler errors in mito2

* chore: fix common-function crate errors

* chore: fix catalog errors

* change import path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: fix some errors in query crate

* chore: fix some errors in query crate

* aggr expr and some other tiny fixes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: fix expr related errors in query crate

* chore: fix query serializer and admin command

* chore: fix grpc services

* feat: axum serve

* chore: fix http server

* remove handle_error handler
* refactor timeout layer
* serve axum

* chore: fix flow aggr functions

* chore: fix flow

* feat: fix errors in meta-srv

* boxed()
* use TokioIo

* feat!: Remove script crate and python feature (#5321)

* feat: exclude script crate

* chore: simplify feature

* feat: remove the script crate

* chore: remove python feature and some comments

* chore: fix warning

* chore: fix servers tests compiler errors

* feat: fix tests-integration errors

* chore: fix unused

* test: fix catalog test

* chore: fix compiler errors for crates using common-meta

testing feature is enabled when check with --workspace

* test: use display for logical plan test

* test: implement rewrite for ScanHintRule

* fix: http server build panic

* test: fix mito test

* fix: sql parser type alias error

* test: fix TestClient not listen

* test: some flow tests

* test(flow): more fix

* fix: test_otlp_logs

* test: fix promql test that using deprecated method fun()

* fix: sql type replace supports Int8 ~ Int64, UInt8 ~ UInt64

* test: fix infer schema test case

* test: fix tests related to plan display

* chore: fix last flow test

* test: fix function format related assertion

* test: use larger port range for tests

* fix: test_otlp_traces

* fix: test_otlp_metrics

* fix range query and dist plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: flow handle distinct use deprecated field

* fix: can't pass Join plan expressions to LogicalPlan::with_new_exprs

* test: fix deserialize test

* test: reduce split key case num

* tests: lower case aggr func name

* test: fix some sqlness tests

* tests: more sqlness fix

* tests: fixed sqlness test

* commit non-bug changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: make our udf correct

* fix: implement empty methods of ContextProvider for DfContextProviderAdapter

* test: update sqlness test result

* chore: remove unused

* fix: provide alias name for AggregateExprBuilder in range plan

* test: update range query result

* fix: implement missing ContextProvider methods for DfContextProviderAdapter

* test: update timestamps, cte result

* fix: supports empty projection in mito

* test: update comment for cte test

* fix: support projection for numbers

* test: update test cases after projection fix

* fix: fix range select first_value/last_value

* fix: handle CAST and time index conflict

* fix: handle order by correctly in range first_value/last_value

* test: update sqlness result

* test: update view test result

* test: update decimal test

wait for https://github.com/apache/datafusion/pull/14126 to fix this

* feat: remove redundant physical optimization

todo(ruihang): Check if we can remove this.

* test: update sqlness test result

* chore: range select default sort use nulls_first = false

* test: update filter push down test result

* test: comment deciaml test to avoid different panic message

* test: update some distributed test result

* test: update test for distributed count and filter push down

* test: update subqueries test

* fix: SessionState may overwrite our UDFs

* chore: fix compiler errors after merging main

* fix: fix elasticsearch and dashboard router panic

* chore: fix common-functions tests

* chore: update sqlness result

* test: fix id keyword and update sqlness result

* test: fix flow_null test

* fix: enlarge thread size in debug mode to avoid overflow

* chore: fix warnings in common-function

* chore: fix warning in flow

* chore: fix warnings in query crate

* chore: remove unused warnings

* chore: fix deprecated warnings for parquet

* chore: fix deprecated warning in servers crate

* style: fix clippy

* test: enlarge mito cache tttl test ttl time

* chore: fix typo

* style: fmt toml

* refactor: reimplement PartialOrd for RangeSelect

* chore: remove script crate files introduced by merge

* fix: return error if sql option is not kv

* chore: do not use ..default::default()

* chore: per review

* chore: update error message in BuildAdminFunctionArgsSnafu

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* refactor: typed precision

* update sqlness view case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: flow per review

* chore: add example in comment

* chore: warn if parquet stats of timestamp is not INT64

* style: add a newline before derive to make the comment more clear

* test: update sqlness result

* fix: flow from substrait

* chore: change update_range_context log to debug level

* chore: move axum-extra axum-macros to workspace

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: luofucong <luofc@foxmail.com>
Co-authored-by: discord9 <discord9@163.com>
Co-authored-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Yingwen
2025-01-23 14:15:40 +08:00
committed by GitHub
parent 3ed085459c
commit 35b635f639
293 changed files with 4067 additions and 3225 deletions

1554
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -88,14 +88,17 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
arrow = { version = "51.0.0", features = ["prettyprint"] }
arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] }
arrow-flight = "51.0"
arrow-ipc = { version = "51.0.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "51.0", features = ["serde"] }
arrow = { version = "53.0.0", features = ["prettyprint"] }
arrow-array = { version = "53.0.0", default-features = false, features = ["chrono-tz"] }
arrow-flight = "53.0"
arrow-ipc = { version = "53.0.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "53.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
axum = { version = "0.6", features = ["headers"] }
# Remember to update axum-extra, axum-macros when updating axum
axum = "0.8"
axum-extra = "0.10"
axum-macros = "0.4"
backon = "1"
base64 = "0.21"
bigdecimal = "0.4.2"
@@ -107,32 +110,35 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "5.4"
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-functions = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-physical-plan = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" }
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-sql = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "2464703c84c400a09cc59277018813f0e797bb4e" }
deadpool = "0.10"
deadpool-postgres = "0.12"
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.13"
etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6cee3db98a552f1dd848dec3eefcce8f26343748" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "683e9d10ae7f3dfb8aaabd89082fc600c17e3795" }
hex = "0.4"
http = "0.2"
http = "1"
humantime = "2.1"
humantime-serde = "1.1"
hyper = "1.1"
hyper-util = "0.1"
itertools = "0.10"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-api = { git = "https://github.com/shuiyisong/tracing-loki", branch = "chore/prost_version" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
mockall = "0.11.4"
moka = "0.12"
@@ -140,7 +146,7 @@ nalgebra = "0.33"
notify = "6.1"
num_cpus = "1.16"
once_cell = "1.18"
opentelemetry-proto = { version = "0.5", features = [
opentelemetry-proto = { version = "0.27", features = [
"gen-tonic",
"metrics",
"trace",
@@ -148,12 +154,12 @@ opentelemetry-proto = { version = "0.5", features = [
"logs",
] }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "53.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.4.3", features = ["ser"] }
prost = "0.12"
prost = "0.13"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
ratelimit = "0.9"
@@ -172,6 +178,7 @@ rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
rustls = { version = "0.23.20", default-features = false } # override by patch, see [patch.crates-io]
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }
serde_with = "3"
@@ -180,9 +187,8 @@ similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sysinfo = "0.30"
rustls = { version = "0.23.20", default-features = false } # override by patch, see [patch.crates-io]
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "54a267ac89c09b11c0c88934690530807185d3e7", features = [
# on branch v0.52.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "71dd86058d2af97b9925093d40c4e03360403170", features = [
"visitor",
"serde",
] } # on branch v0.44.x
@@ -194,8 +200,8 @@ tokio-rustls = { version = "0.26.0", default-features = false } # override by pa
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.11", features = ["tls", "gzip", "zstd"] }
tower = "0.4"
tonic = { version = "0.12", features = ["tls", "gzip", "zstd"] }
tower = "0.5"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2"

View File

@@ -33,7 +33,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: prost::DecodeError,
error: prost::UnknownEnumValue,
},
#[snafu(display("Failed to create column datatype from {:?}", from))]

View File

@@ -86,7 +86,7 @@ impl ColumnDataTypeWrapper {
/// Get a tuple of ColumnDataType and ColumnDataTypeExtension.
pub fn to_parts(&self) -> (ColumnDataType, Option<ColumnDataTypeExtension>) {
(self.datatype, self.datatype_ext.clone())
(self.datatype, self.datatype_ext)
}
}
@@ -685,14 +685,18 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
IntervalType::YearMonth(_) => Arc::new(IntervalYearMonthVector::from_vec(
values.interval_year_month_values,
)),
IntervalType::DayTime(_) => Arc::new(IntervalDayTimeVector::from_vec(
values.interval_day_time_values,
IntervalType::DayTime(_) => Arc::new(IntervalDayTimeVector::from_iter_values(
values
.interval_day_time_values
.iter()
.map(|x| IntervalDayTime::from_i64(*x).into()),
)),
IntervalType::MonthDayNano(_) => {
Arc::new(IntervalMonthDayNanoVector::from_iter_values(
values.interval_month_day_nano_values.iter().map(|x| {
IntervalMonthDayNano::new(x.months, x.days, x.nanoseconds).to_i128()
}),
values
.interval_month_day_nano_values
.iter()
.map(|x| IntervalMonthDayNano::new(x.months, x.days, x.nanoseconds).into()),
))
}
},
@@ -1495,14 +1499,22 @@ mod tests {
column.values.as_ref().unwrap().interval_year_month_values
);
let vector = Arc::new(IntervalDayTimeVector::from_vec(vec![4, 5, 6]));
let vector = Arc::new(IntervalDayTimeVector::from_vec(vec![
IntervalDayTime::new(0, 4).into(),
IntervalDayTime::new(0, 5).into(),
IntervalDayTime::new(0, 6).into(),
]));
push_vals(&mut column, 3, vector);
assert_eq!(
vec![4, 5, 6],
column.values.as_ref().unwrap().interval_day_time_values
);
let vector = Arc::new(IntervalMonthDayNanoVector::from_vec(vec![7, 8, 9]));
let vector = Arc::new(IntervalMonthDayNanoVector::from_vec(vec![
IntervalMonthDayNano::new(0, 0, 7).into(),
IntervalMonthDayNano::new(0, 0, 8).into(),
IntervalMonthDayNano::new(0, 0, 9).into(),
]));
let len = vector.len();
push_vals(&mut column, 3, vector);
(0..len).for_each(|i| {

View File

@@ -34,10 +34,8 @@ const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
let data_type = ColumnDataTypeWrapper::try_new(
column_def.data_type,
column_def.datatype_extension.clone(),
)?;
let data_type =
ColumnDataTypeWrapper::try_new(column_def.data_type, column_def.datatype_extension)?;
let constraint = if column_def.default_constraint.is_empty() {
None

View File

@@ -41,6 +41,7 @@ pub mod information_schema {
}
pub mod table_source;
#[async_trait::async_trait]
pub trait CatalogManager: Send + Sync {
fn as_any(&self) -> &dyn Any;

View File

@@ -64,6 +64,7 @@ const INIT_CAPACITY: usize = 42;
/// - `uptime`: the uptime of the peer.
/// - `active_time`: the time since the last activity of the peer.
///
#[derive(Debug)]
pub(super) struct InformationSchemaClusterInfo {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,

View File

@@ -45,6 +45,7 @@ use crate::error::{
use crate::information_schema::Predicates;
use crate::CatalogManager;
#[derive(Debug)]
pub(super) struct InformationSchemaColumns {
schema: SchemaRef,
catalog_name: String,

View File

@@ -61,7 +61,7 @@ pub const FLOWNODE_IDS: &str = "flownode_ids";
pub const OPTIONS: &str = "options";
/// The `information_schema.flows` to provides information about flows in databases.
///
#[derive(Debug)]
pub(super) struct InformationSchemaFlows {
schema: SchemaRef,
catalog_name: String,

View File

@@ -62,6 +62,7 @@ pub(crate) const FULLTEXT_INDEX_CONSTRAINT_NAME: &str = "FULLTEXT INDEX";
pub(crate) const SKIPPING_INDEX_CONSTRAINT_NAME: &str = "SKIPPING INDEX";
/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
#[derive(Debug)]
pub(super) struct InformationSchemaKeyColumnUsage {
schema: SchemaRef,
catalog_name: String,

View File

@@ -59,6 +59,7 @@ const INIT_CAPACITY: usize = 42;
/// The `PARTITIONS` table provides information about partitioned tables.
/// See https://dev.mysql.com/doc/refman/8.0/en/information-schema-partitions-table.html
/// We provide an extral column `greptime_partition_id` for GreptimeDB region id.
#[derive(Debug)]
pub(super) struct InformationSchemaPartitions {
schema: SchemaRef,
catalog_name: String,

View File

@@ -56,7 +56,7 @@ const INIT_CAPACITY: usize = 42;
/// - `end_time`: the ending execution time of the procedure.
/// - `status`: the status of the procedure.
/// - `lock_keys`: the lock keys of the procedure.
///
#[derive(Debug)]
pub(super) struct InformationSchemaProcedureInfo {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,

View File

@@ -59,7 +59,7 @@ const INIT_CAPACITY: usize = 42;
/// - `is_leader`: whether the peer is the leader
/// - `status`: the region status, `ALIVE` or `DOWNGRADED`.
/// - `down_seconds`: the duration of being offline, in seconds.
///
#[derive(Debug)]
pub(super) struct InformationSchemaRegionPeers {
schema: SchemaRef,
catalog_name: String,

View File

@@ -63,7 +63,7 @@ const INIT_CAPACITY: usize = 42;
/// - `index_size`: The sst index files size in bytes.
/// - `engine`: The engine type.
/// - `region_role`: The region role.
///
#[derive(Debug)]
pub(super) struct InformationSchemaRegionStatistics {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,

View File

@@ -38,6 +38,7 @@ use store_api::storage::{ScanRequest, TableId};
use super::{InformationTable, RUNTIME_METRICS};
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
#[derive(Debug)]
pub(super) struct InformationSchemaMetrics {
schema: SchemaRef,
}

View File

@@ -49,6 +49,7 @@ pub const SCHEMA_OPTS: &str = "options";
const INIT_CAPACITY: usize = 42;
/// The `information_schema.schemata` table implementation.
#[derive(Debug)]
pub(super) struct InformationSchemaSchemata {
schema: SchemaRef,
catalog_name: String,

View File

@@ -43,6 +43,7 @@ use crate::information_schema::Predicates;
use crate::CatalogManager;
/// The `TABLE_CONSTRAINTS` table describes which tables have constraints.
#[derive(Debug)]
pub(super) struct InformationSchemaTableConstraints {
schema: SchemaRef,
catalog_name: String,

View File

@@ -71,6 +71,7 @@ const TABLE_ID: &str = "table_id";
pub const ENGINE: &str = "engine";
const INIT_CAPACITY: usize = 42;
#[derive(Debug)]
pub(super) struct InformationSchemaTables {
schema: SchemaRef,
catalog_name: String,

View File

@@ -54,6 +54,7 @@ pub const CHARACTER_SET_CLIENT: &str = "character_set_client";
pub const COLLATION_CONNECTION: &str = "collation_connection";
/// The `information_schema.views` to provides information about views in databases.
#[derive(Debug)]
pub(super) struct InformationSchemaViews {
schema: SchemaRef,
catalog_name: String,

View File

@@ -33,6 +33,7 @@ use super::SystemTable;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
/// A memory table with specified schema and columns.
#[derive(Debug)]
pub(crate) struct MemoryTable {
pub(crate) table_id: TableId,
pub(crate) table_name: &'static str,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -100,6 +101,15 @@ impl PGClass {
}
}
impl fmt::Debug for PGClass {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PGClass")
.field("schema", &self.schema)
.field("catalog_name", &self.catalog_name)
.finish()
}
}
impl SystemTable for PGClass {
fn table_id(&self) -> table::metadata::TableId {
PG_CATALOG_PG_CLASS_TABLE_ID

View File

@@ -55,6 +55,15 @@ pub(super) struct PGDatabase {
namespace_oid_map: PGNamespaceOidMapRef,
}
impl std::fmt::Debug for PGDatabase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PGDatabase")
.field("schema", &self.schema)
.field("catalog_name", &self.catalog_name)
.finish()
}
}
impl PGDatabase {
pub(super) fn new(
catalog_name: String,

View File

@@ -17,6 +17,7 @@
pub(super) mod oid_map;
use std::fmt;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -87,6 +88,15 @@ impl PGNamespace {
}
}
impl fmt::Debug for PGNamespace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PGNamespace")
.field("schema", &self.schema)
.field("catalog_name", &self.catalog_name)
.finish()
}
}
impl SystemTable for PGNamespace {
fn schema(&self) -> SchemaRef {
self.schema.clone()

View File

@@ -365,7 +365,7 @@ mod tests {
Projection: person.id AS a, person.name AS b
Filter: person.id > Int32(500)
TableScan: person"#,
format!("\n{:?}", source.get_logical_plan().unwrap())
format!("\n{}", source.get_logical_plan().unwrap())
);
}
}

View File

@@ -15,12 +15,12 @@
//! Dummy catalog for region server.
use std::any::Any;
use std::fmt;
use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::format_full_table_name;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
use datafusion::datasource::TableProvider;
use snafu::OptionExt;
use table::table::adapter::DfTableProviderAdapter;
@@ -41,6 +41,12 @@ impl DummyCatalogList {
}
}
impl fmt::Debug for DummyCatalogList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DummyCatalogList").finish()
}
}
impl CatalogProviderList for DummyCatalogList {
fn as_any(&self) -> &dyn Any {
self
@@ -91,6 +97,14 @@ impl CatalogProvider for DummyCatalogProvider {
}
}
impl fmt::Debug for DummyCatalogProvider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DummyCatalogProvider")
.field("catalog_name", &self.catalog_name)
.finish()
}
}
/// A dummy schema provider for [DummyCatalogList].
#[derive(Clone)]
struct DummySchemaProvider {
@@ -127,3 +141,12 @@ impl SchemaProvider for DummySchemaProvider {
true
}
}
impl fmt::Debug for DummySchemaProvider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DummySchemaProvider")
.field("catalog_name", &self.catalog_name)
.field("schema_name", &self.schema_name)
.finish()
}
}

View File

@@ -31,7 +31,7 @@ derive_builder.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
orc-rust = { git = "https://github.com/datafusion-contrib/datafusion-orc.git", rev = "502217315726314c4008808fe169764529640599", default-features = false, features = [
orc-rust = { version = "0.5", default-features = false, features = [
"async",
] }
parquet.workspace = true

View File

@@ -126,8 +126,7 @@ impl ArrowDecoder for arrow::csv::reader::Decoder {
}
}
#[allow(deprecated)]
impl ArrowDecoder for arrow::json::RawDecoder {
impl ArrowDecoder for arrow::json::reader::Decoder {
fn decode(&mut self, buf: &[u8]) -> result::Result<usize, ArrowError> {
self.decode(buf)
}

View File

@@ -17,8 +17,7 @@ use std::str::FromStr;
use std::sync::Arc;
use arrow::csv;
#[allow(deprecated)]
use arrow::csv::reader::infer_reader_schema as infer_csv_schema;
use arrow::csv::reader::Format;
use arrow::record_batch::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
@@ -161,7 +160,6 @@ impl FileOpener for CsvOpener {
}
}
#[allow(deprecated)]
#[async_trait]
impl FileFormat for CsvFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
@@ -188,9 +186,12 @@ impl FileFormat for CsvFormat {
common_runtime::spawn_blocking_global(move || {
let reader = SyncIoBridge::new(decoded);
let (schema, _records_read) =
infer_csv_schema(reader, delimiter, schema_infer_max_record, has_header)
.context(error::InferSchemaSnafu)?;
let format = Format::default()
.with_delimiter(delimiter)
.with_header(has_header);
let (schema, _records_read) = format
.infer_schema(reader, schema_infer_max_record)
.context(error::InferSchemaSnafu)?;
Ok(schema)
})
.await
@@ -253,7 +254,7 @@ mod tests {
"c7: Int64: NULL",
"c8: Int64: NULL",
"c9: Int64: NULL",
"c10: Int64: NULL",
"c10: Utf8: NULL",
"c11: Float64: NULL",
"c12: Float64: NULL",
"c13: Utf8: NULL"

View File

@@ -20,8 +20,7 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow::json::writer::LineDelimited;
#[allow(deprecated)]
use arrow::json::{self, RawReaderBuilder};
use arrow::json::{self, ReaderBuilder};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
@@ -140,7 +139,6 @@ impl JsonOpener {
}
}
#[allow(deprecated)]
impl FileOpener for JsonOpener {
fn open(&self, meta: FileMeta) -> DataFusionResult<FileOpenFuture> {
open_with_decoder(
@@ -148,7 +146,7 @@ impl FileOpener for JsonOpener {
meta.location().to_string(),
self.compression_type,
|| {
RawReaderBuilder::new(self.projected_schema.clone())
ReaderBuilder::new(self.projected_schema.clone())
.with_batch_size(self.batch_size)
.build_decoder()
.map_err(DataFusionError::from)

View File

@@ -91,6 +91,7 @@ mod tests {
use std::sync::Arc;
use common_query::prelude::{TypeSignature, Volatility};
use datatypes::arrow::datatypes::IntervalDayTime;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::{
@@ -134,7 +135,12 @@ mod tests {
let times = vec![Some(123), None, Some(42), None];
// Intervals in milliseconds
let intervals = vec![1000, 2000, 3000, 1000];
let intervals = vec![
IntervalDayTime::new(0, 1000),
IntervalDayTime::new(0, 2000),
IntervalDayTime::new(0, 3000),
IntervalDayTime::new(0, 1000),
];
let results = [Some(124), None, Some(45), None];
let time_vector = TimestampSecondVector::from(times.clone());

View File

@@ -91,6 +91,7 @@ mod tests {
use std::sync::Arc;
use common_query::prelude::{TypeSignature, Volatility};
use datatypes::arrow::datatypes::IntervalDayTime;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use datatypes::vectors::{
@@ -139,7 +140,12 @@ mod tests {
let times = vec![Some(123), None, Some(42), None];
// Intervals in milliseconds
let intervals = vec![1000, 2000, 3000, 1000];
let intervals = vec![
IntervalDayTime::new(0, 1000),
IntervalDayTime::new(0, 2000),
IntervalDayTime::new(0, 3000),
IntervalDayTime::new(0, 1000),
];
let results = [Some(122), None, Some(39), None];
let time_vector = TimestampSecondVector::from(times.clone());

View File

@@ -21,10 +21,9 @@ use common_query::error::{
};
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeIterator, TreeNodeRecursion};
use datafusion::common::{DFSchema, Result as DfResult};
use datafusion::execution::context::SessionState;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::{self, Expr, Volatility};
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::prelude::SessionConfig;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field};
use datatypes::prelude::VectorRef;
@@ -104,8 +103,7 @@ impl MatchesFunction {
let like_expr = ast.into_like_expr(col_name);
let input_schema = Self::input_schema();
let session_state =
SessionState::new_with_config_rt(SessionConfig::default(), Arc::default());
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = DefaultPhysicalPlanner::default();
let physical_expr = planner
.create_physical_expr(&like_expr, &input_schema, &session_state)
@@ -131,7 +129,7 @@ impl MatchesFunction {
}
fn input_schema() -> DFSchema {
DFSchema::from_unqualifed_fields(
DFSchema::from_unqualified_fields(
[Arc::new(Field::new("data", DataType::Utf8, true))].into(),
HashMap::new(),
)

View File

@@ -42,7 +42,7 @@ impl Function for BuildFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
@@ -56,8 +56,6 @@ impl Function for BuildFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use super::*;
#[test]
fn test_build_function() {
@@ -67,12 +65,7 @@ mod tests {
ConcreteDataType::string_datatype(),
build.return_type(&[]).unwrap()
);
assert!(matches!(build.signature(),
Signature {
type_signature: TypeSignature::Uniform(0, valid_types),
volatility: Volatility::Immutable
} if valid_types.is_empty()
));
assert_eq!(build.signature(), Signature::nullary(Volatility::Immutable));
let build_info = common_version::build_info().to_string();
let vector = build.eval(FunctionContext::default(), &[]).unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec![build_info]));

View File

@@ -44,7 +44,7 @@ impl Function for DatabaseFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
@@ -116,7 +116,6 @@ impl fmt::Display for SessionUserFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use session::context::QueryContextBuilder;
use super::*;
@@ -128,12 +127,7 @@ mod tests {
ConcreteDataType::string_datatype(),
build.return_type(&[]).unwrap()
);
assert!(matches!(build.signature(),
Signature {
type_signature: TypeSignature::Uniform(0, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![]
));
assert_eq!(build.signature(), Signature::nullary(Volatility::Immutable));
let query_ctx = QueryContextBuilder::default()
.current_schema("test_db".to_string())

View File

@@ -38,7 +38,7 @@ impl Function for TimezoneFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(0, vec![], Volatility::Immutable)
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
@@ -58,7 +58,6 @@ impl fmt::Display for TimezoneFunction {
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use session::context::QueryContextBuilder;
use super::*;
@@ -70,12 +69,7 @@ mod tests {
ConcreteDataType::string_datatype(),
build.return_type(&[]).unwrap()
);
assert!(matches!(build.signature(),
Signature {
type_signature: TypeSignature::Uniform(0, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![]
));
assert_eq!(build.signature(), Signature::nullary(Volatility::Immutable));
let query_ctx = QueryContextBuilder::default().build().into();

View File

@@ -42,7 +42,7 @@ impl Function for VersionFunction {
}
fn signature(&self) -> Signature {
Signature::exact(vec![], Volatility::Immutable)
Signature::nullary(Volatility::Immutable)
}
fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {

View File

@@ -107,7 +107,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: prost::DecodeError,
error: prost::UnknownEnumValue,
},
#[snafu(display(
@@ -137,7 +137,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: prost::DecodeError,
error: prost::UnknownEnumValue,
},
#[snafu(display("Missing alter index options"))]

View File

@@ -255,7 +255,7 @@ mod tests {
ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(
decimal_column.data_type,
decimal_column.datatype_extension.clone(),
decimal_column.datatype_extension,
)
.unwrap()
)
@@ -351,7 +351,6 @@ mod tests {
.as_ref()
.unwrap()
.datatype_extension
.clone()
)
.unwrap()
)

View File

@@ -166,7 +166,7 @@ pub fn build_create_table_expr(
default_constraint: vec![],
semantic_type,
comment: String::new(),
datatype_extension: datatype_extension.clone(),
datatype_extension: *datatype_extension,
options: options.clone(),
});
}
@@ -208,7 +208,7 @@ pub fn extract_new_columns(
default_constraint: vec![],
semantic_type: expr.semantic_type,
comment: String::new(),
datatype_extension: expr.datatype_extension.clone(),
datatype_extension: *expr.datatype_extension,
options: expr.options.clone(),
});
AddColumn {

View File

@@ -19,7 +19,8 @@ common-telemetry.workspace = true
common-time.workspace = true
dashmap.workspace = true
datatypes.workspace = true
flatbuffers = "23.1"
flatbuffers = "24"
hyper.workspace = true
lazy_static.workspace = true
prost.workspace = true
snafu.workspace = true
@@ -29,6 +30,7 @@ tower.workspace = true
[dev-dependencies]
criterion = "0.4"
hyper-util = { workspace = true, features = ["tokio"] }
rand.workspace = true
[[bench]]

View File

@@ -25,7 +25,7 @@ use snafu::{OptionExt, ResultExt};
use tonic::transport::{
Certificate, Channel as InnerChannel, ClientTlsConfig, Endpoint, Identity, Uri,
};
use tower::make::MakeConnection;
use tower::Service;
use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsConfigSnafu, Result};
@@ -137,8 +137,8 @@ impl ChannelManager {
connector: C,
) -> Result<InnerChannel>
where
C: MakeConnection<Uri> + Send + 'static,
C::Connection: Unpin + Send + 'static,
C: Service<Uri> + Send + 'static,
C::Response: hyper::rt::Read + hyper::rt::Write + Send + Unpin,
C::Future: Send + 'static,
Box<dyn std::error::Error + Send + Sync>: From<C::Error> + Send + 'static,
{
@@ -607,7 +607,7 @@ mod tests {
});
let (client, _) = tokio::io::duplex(1024);
let mut client = Some(client);
let mut client = Some(hyper_util::rt::TokioIo::new(client));
let res = mgr.reset_with_connector(
addr,
service_fn(move |_| {

View File

@@ -205,7 +205,7 @@ pub fn values(arrays: &[VectorRef]) -> Result<Values> {
ConcreteDataType::Interval(IntervalType::DayTime(_)),
IntervalDayTimeVector,
interval_day_time_values,
|x| { x.into_native() }
|x| { x.to_i64() }
),
(
ConcreteDataType::Interval(IntervalType::MonthDayNano(_)),
@@ -232,6 +232,8 @@ pub fn values(arrays: &[VectorRef]) -> Result<Values> {
mod tests {
use std::sync::Arc;
use datatypes::arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano};
use super::*;
#[test]
@@ -266,7 +268,12 @@ mod tests {
#[test]
fn test_convert_arrow_array_interval_day_time() {
let array = IntervalDayTimeVector::from(vec![Some(1), Some(2), None, Some(3)]);
let array = IntervalDayTimeVector::from(vec![
Some(IntervalDayTime::new(0, 1)),
Some(IntervalDayTime::new(0, 2)),
None,
Some(IntervalDayTime::new(0, 3)),
]);
let array: VectorRef = Arc::new(array);
let values = values(&[array]).unwrap();
@@ -276,7 +283,12 @@ mod tests {
#[test]
fn test_convert_arrow_array_interval_month_day_nano() {
let array = IntervalMonthDayNanoVector::from(vec![Some(1), Some(2), None, Some(3)]);
let array = IntervalMonthDayNanoVector::from(vec![
Some(IntervalMonthDayNano::new(0, 0, 1)),
Some(IntervalMonthDayNano::new(0, 0, 2)),
None,
Some(IntervalMonthDayNano::new(0, 0, 3)),
]);
let array: VectorRef = Arc::new(array);
let values = values(&[array]).unwrap();

View File

@@ -138,7 +138,7 @@ fn build_struct(
datafusion_expr::create_udf(
Self::name(),
Self::input_type(),
Arc::new(Self::return_type()),
Self::return_type(),
Volatility::Immutable,
Arc::new(Self::calc) as _,
)

View File

@@ -47,7 +47,7 @@ pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<Crea
default_constraint: c.default_constraint.clone(),
semantic_type: semantic_type as i32,
comment: String::new(),
datatype_extension: c.datatype_extension.clone(),
datatype_extension: c.datatype_extension,
options: c.options.clone(),
}),
column_id: i as u32,

View File

@@ -378,6 +378,12 @@ impl FlowMetadataManager {
}
}
impl std::fmt::Debug for FlowMetadataManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlowMetadataManager").finish()
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

View File

@@ -290,7 +290,8 @@ mod tests {
num_per_range: u32,
max_bytes: u32,
) {
let num_cases = rand::thread_rng().gen_range(1..=26);
let num_cases = rand::thread_rng().gen_range(1..=8);
common_telemetry::info!("num_cases: {}", num_cases);
let mut cases = Vec::with_capacity(num_cases);
for i in 0..num_cases {
let size = rand::thread_rng().gen_range(size_limit..=max_bytes);
@@ -324,6 +325,7 @@ mod tests {
// Puts the values
for TestCase { key, value, .. } in &cases {
common_telemetry::info!("put key: {}, size: {}", key, value.len());
store.put(key, value.clone()).await.unwrap();
}
@@ -332,6 +334,7 @@ mod tests {
let data = walk_top_down(prefix).await;
assert_eq!(data.len(), 1);
let (keyset, got) = data.into_iter().next().unwrap();
common_telemetry::info!("get key: {}", keyset.key());
let num_expected_keys = value.len().div_ceil(size_limit as usize);
assert_eq!(&got, value);
assert_eq!(keyset.key(), key);
@@ -364,6 +367,7 @@ mod tests {
let prefix = "test_etcd_store_split_value/";
let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
let kv_backend: KvBackendRef = if endpoints.is_empty() {
common_telemetry::info!("Using MemoryKvBackend");
Arc::new(MemoryKvBackend::new())
} else {
let endpoints = endpoints

View File

@@ -237,6 +237,15 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to register UDF: {}", name))]
RegisterUdf {
name: String,
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -268,7 +277,8 @@ impl ErrorExt for Error {
Error::MissingTableMutationHandler { .. }
| Error::MissingProcedureServiceHandler { .. }
| Error::MissingFlowServiceHandler { .. } => StatusCode::Unexpected,
| Error::MissingFlowServiceHandler { .. }
| Error::RegisterUdf { .. } => StatusCode::Unexpected,
Error::UnsupportedInputDataType { .. }
| Error::TypeCast { .. }

View File

@@ -214,8 +214,7 @@ mod tests {
];
// call the function
let result = (df_udf.fun())(&args).unwrap();
let result = df_udf.invoke_batch(&args, 4).unwrap();
match result {
DfColumnarValue::Array(arr) => {
let arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
@@ -308,7 +307,7 @@ mod tests {
Projection: person.id AS a, person.name AS b
Filter: person.id > Int32(500)
TableScan: person"#,
format!("\n{:?}", new_plan)
format!("\n{}", new_plan)
);
}

View File

@@ -39,6 +39,10 @@ pub enum TypeSignature {
Any(usize),
/// One of a list of signatures
OneOf(Vec<TypeSignature>),
/// Zero argument
/// This is the new signature for functions with zero arguments
/// TODO(discord9): make all other usize nonzero usize
NullAry,
}
///The Signature of a function defines its supported input types as well as its volatility.
@@ -112,6 +116,13 @@ impl Signature {
volatility,
}
}
pub fn nullary(volatility: Volatility) -> Self {
Signature {
type_signature: TypeSignature::NullAry,
volatility,
}
}
}
/// Conversations between datafusion signature and our signature
@@ -122,16 +133,25 @@ impl From<TypeSignature> for DfTypeSignature {
DfTypeSignature::Variadic(concrete_types_to_arrow_types(types))
}
TypeSignature::Uniform(n, types) => {
if n == 0 {
return DfTypeSignature::NullAry;
}
DfTypeSignature::Uniform(n, concrete_types_to_arrow_types(types))
}
TypeSignature::Exact(types) => {
DfTypeSignature::Exact(concrete_types_to_arrow_types(types))
}
TypeSignature::Any(n) => DfTypeSignature::Any(n),
TypeSignature::Any(n) => {
if n == 0 {
return DfTypeSignature::NullAry;
}
DfTypeSignature::Any(n)
}
TypeSignature::OneOf(ts) => {
DfTypeSignature::OneOf(ts.into_iter().map(Into::into).collect())
}
TypeSignature::VariadicAny => DfTypeSignature::VariadicAny,
TypeSignature::NullAry => DfTypeSignature::NullAry,
}
}
}

View File

@@ -119,6 +119,10 @@ impl ExecutionPlan for StreamScanAdapter {
.ok_or_else(|| DataFusionError::Execution("Stream already exhausted".to_string()))?;
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream)))
}
fn name(&self) -> &str {
"StreamScanAdapter"
}
}
#[cfg(test)]

View File

@@ -17,6 +17,7 @@ use std::slice;
use std::sync::Arc;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datatypes::arrow::array::RecordBatchOptions;
use datatypes::prelude::DataType;
use datatypes::schema::SchemaRef;
use datatypes::value::Value;
@@ -73,6 +74,21 @@ impl RecordBatch {
}
}
/// Create an empty [`RecordBatch`] from `schema` with `num_rows`.
pub fn new_with_count(schema: SchemaRef, num_rows: usize) -> Result<Self> {
let df_record_batch = DfRecordBatch::try_new_with_options(
schema.arrow_schema().clone(),
vec![],
&RecordBatchOptions::new().with_row_count(Some(num_rows)),
)
.context(error::NewDfRecordBatchSnafu)?;
Ok(RecordBatch {
schema,
columns: vec![],
df_record_batch,
})
}
pub fn try_project(&self, indices: &[usize]) -> Result<Self> {
let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?);
let mut columns = Vec::with_capacity(indices.len());

View File

@@ -142,16 +142,25 @@ impl Builder {
impl BuilderBuild<DefaultRuntime> for Builder {
fn build(&mut self) -> Result<DefaultRuntime> {
let runtime = self
let builder = self
.builder
.enable_all()
.thread_name(self.thread_name.clone())
.on_thread_start(on_thread_start(self.thread_name.clone()))
.on_thread_stop(on_thread_stop(self.thread_name.clone()))
.on_thread_park(on_thread_park(self.thread_name.clone()))
.on_thread_unpark(on_thread_unpark(self.thread_name.clone()))
.build()
.context(BuildRuntimeSnafu)?;
.on_thread_unpark(on_thread_unpark(self.thread_name.clone()));
let runtime = if cfg!(debug_assertions) {
// Set the stack size to 8MB for the thread so it wouldn't overflow on large stack usage in debug mode
// This is necessary to avoid stack overflow while running sqlness.
// https://github.com/rust-lang/rust/issues/34283
builder
.thread_stack_size(8 * 1024 * 1024)
.build()
.context(BuildRuntimeSnafu)?
} else {
builder.build().context(BuildRuntimeSnafu)?
};
let name = self.runtime_name.clone();
let handle = runtime.handle().clone();

View File

@@ -16,10 +16,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use datafusion::catalog::CatalogProviderList;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::SessionConfig;
use datafusion_expr::LogicalPlan;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
@@ -41,13 +41,10 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
async fn decode<B: Buf + Send>(
&self,
message: B,
catalog_list: Arc<dyn CatalogProviderList>,
state: SessionState,
) -> Result<Self::Plan, Self::Error> {
let mut context = SessionContext::new_with_state(state);
context.register_catalog_list(catalog_list);
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
let df_plan = from_substrait_plan(&context, &plan)
let df_plan = from_substrait_plan(&state, &plan)
.await
.context(DecodeDfPlanSnafu)?;
Ok(df_plan)
@@ -72,11 +69,12 @@ impl DFLogicalSubstraitConvertor {
plan: &LogicalPlan,
serializer: impl SerializerRegistry + 'static,
) -> Result<Box<Plan>, Error> {
let session_state =
SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default()))
.with_serializer_registry(Arc::new(serializer));
let context = SessionContext::new_with_state(session_state);
to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)
let state = SessionStateBuilder::new()
.with_config(SessionConfig::new())
.with_runtime_env(Arc::new(RuntimeEnv::default()))
.with_default_features()
.with_serializer_registry(Arc::new(serializer))
.build();
to_substrait_plan(plan, &state).context(EncodeDfPlanSnafu)
}
}

View File

@@ -22,6 +22,7 @@ use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize,
};
#[derive(Debug)]
pub struct ExtensionSerializer;
impl SerializerRegistry for ExtensionSerializer {

View File

@@ -17,11 +17,9 @@
mod df_substrait;
pub mod error;
pub mod extension_serializer;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion::catalog::CatalogProviderList;
use datafusion::execution::context::SessionState;
pub use datafusion::execution::registry::SerializerRegistry;
/// Re-export the Substrait module of datafusion,
@@ -40,7 +38,6 @@ pub trait SubstraitPlan {
async fn decode<B: Buf + Send>(
&self,
message: B,
catalog_list: Arc<dyn CatalogProviderList>,
state: SessionState,
) -> Result<Self::Plan, Self::Error>;

View File

@@ -22,6 +22,6 @@ static PORTS: OnceCell<AtomicUsize> = OnceCell::new();
/// Return a unique port(in runtime) for test
pub fn get_port() -> usize {
PORTS
.get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3000..3800)))
.get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(13000..13800)))
.fetch_add(1, Ordering::Relaxed)
}

View File

@@ -184,6 +184,24 @@ impl From<IntervalDayTime> for serde_json::Value {
}
}
impl From<arrow::datatypes::IntervalDayTime> for IntervalDayTime {
fn from(value: arrow::datatypes::IntervalDayTime) -> Self {
Self {
days: value.days,
milliseconds: value.milliseconds,
}
}
}
impl From<IntervalDayTime> for arrow::datatypes::IntervalDayTime {
fn from(value: IntervalDayTime) -> Self {
Self {
days: value.days,
milliseconds: value.milliseconds,
}
}
}
// Millisecond convert to other time unit
pub const MS_PER_SEC: i64 = 1_000;
pub const MS_PER_MINUTE: i64 = 60 * MS_PER_SEC;
@@ -283,6 +301,26 @@ impl From<IntervalMonthDayNano> for serde_json::Value {
}
}
impl From<arrow::datatypes::IntervalMonthDayNano> for IntervalMonthDayNano {
fn from(value: arrow::datatypes::IntervalMonthDayNano) -> Self {
Self {
months: value.months,
days: value.days,
nanoseconds: value.nanoseconds,
}
}
}
impl From<IntervalMonthDayNano> for arrow::datatypes::IntervalMonthDayNano {
fn from(value: IntervalMonthDayNano) -> Self {
Self {
months: value.months,
days: value.days,
nanoseconds: value.nanoseconds,
}
}
}
// Nanosecond convert to other time unit
pub const NS_PER_SEC: i64 = 1_000_000_000;
pub const NS_PER_MINUTE: i64 = 60 * NS_PER_SEC;

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::datatypes::{
IntervalDayTime as ArrowIntervalDayTime, IntervalMonthDayNano as ArrowIntervalMonthDayNano,
};
use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
use paste::paste;
@@ -53,11 +56,11 @@ macro_rules! define_interval_with_unit {
type Native = $native_ty;
fn from_native(value: Self::Native) -> Self {
Self::[<from_ $native_ty>](value)
Self::from(value)
}
fn into_native(self) -> Self::Native {
self.[<to_ $native_ty>]()
self.into()
}
}
}
@@ -65,8 +68,8 @@ macro_rules! define_interval_with_unit {
}
define_interval_with_unit!(YearMonth, i32);
define_interval_with_unit!(DayTime, i64);
define_interval_with_unit!(MonthDayNano, i128);
define_interval_with_unit!(DayTime, ArrowIntervalDayTime);
define_interval_with_unit!(MonthDayNano, ArrowIntervalMonthDayNano);
#[cfg(test)]
mod tests {
@@ -82,12 +85,15 @@ mod tests {
let interval = IntervalDayTime::from(1000);
assert_eq!(interval, interval.as_scalar_ref());
assert_eq!(interval, interval.to_owned_scalar());
assert_eq!(1000, interval.into_native());
assert_eq!(ArrowIntervalDayTime::from(interval), interval.into_native());
let interval = IntervalMonthDayNano::from(1000);
assert_eq!(interval, interval.as_scalar_ref());
assert_eq!(interval, interval.to_owned_scalar());
assert_eq!(1000, interval.into_native());
assert_eq!(
ArrowIntervalMonthDayNano::from(interval),
interval.into_native()
);
}
#[test]

View File

@@ -447,7 +447,21 @@ impl TryFrom<&Field> for ColumnSchema {
}
None => None,
};
let is_time_index = metadata.contains_key(TIME_INDEX_KEY);
let mut is_time_index = metadata.contains_key(TIME_INDEX_KEY);
if is_time_index && !data_type.is_timestamp() {
// If the column is time index but the data type is not timestamp, it is invalid.
// We set the time index to false and remove the metadata.
// This is possible if we cast the time index column to another type. DataFusion will
// keep the metadata:
// https://github.com/apache/datafusion/pull/12951
is_time_index = false;
metadata.remove(TIME_INDEX_KEY);
common_telemetry::debug!(
"Column {} is not timestamp ({:?}) but has time index metadata",
data_type,
field.name(),
);
}
Ok(ColumnSchema {
name: field.name().clone(),
@@ -657,7 +671,7 @@ impl TryFrom<HashMap<String, String>> for SkippingIndexOptions {
mod tests {
use std::sync::Arc;
use arrow::datatypes::DataType as ArrowDataType;
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use super::*;
use crate::value::Value;
@@ -879,4 +893,40 @@ mod tests {
&ConcreteDataType::vector_datatype(3).name()
);
}
#[test]
fn test_column_schema_fix_time_index() {
let field = Field::new(
"test",
ArrowDataType::Timestamp(TimeUnit::Second, None),
false,
);
let field = field.with_metadata(Metadata::from([(
TIME_INDEX_KEY.to_string(),
"true".to_string(),
)]));
let column_schema = ColumnSchema::try_from(&field).unwrap();
assert_eq!("test", column_schema.name);
assert_eq!(
ConcreteDataType::timestamp_second_datatype(),
column_schema.data_type
);
assert!(!column_schema.is_nullable);
assert!(column_schema.is_time_index);
assert!(column_schema.default_constraint.is_none());
assert_eq!(1, column_schema.metadata().len());
let field = Field::new("test", ArrowDataType::Int32, false);
let field = field.with_metadata(Metadata::from([(
TIME_INDEX_KEY.to_string(),
"true".to_string(),
)]));
let column_schema = ColumnSchema::try_from(&field).unwrap();
assert_eq!("test", column_schema.name);
assert_eq!(ConcreteDataType::int32_datatype(), column_schema.data_type);
assert!(!column_schema.is_nullable);
assert!(!column_schema.is_time_index);
assert!(column_schema.default_constraint.is_none());
assert!(column_schema.metadata.is_empty());
}
}

View File

@@ -13,7 +13,9 @@
// limitations under the License.
use arrow::datatypes::{
DataType as ArrowDataType, IntervalDayTimeType as ArrowIntervalDayTimeType,
DataType as ArrowDataType, IntervalDayTime as ArrowIntervalDayTime,
IntervalDayTimeType as ArrowIntervalDayTimeType,
IntervalMonthDayNano as ArrowIntervalMonthDayNano,
IntervalMonthDayNanoType as ArrowIntervalMonthDayNanoType, IntervalUnit as ArrowIntervalUnit,
IntervalYearMonthType as ArrowIntervalYearMonthType,
};
@@ -137,8 +139,8 @@ macro_rules! impl_data_type_for_interval {
}
impl_data_type_for_interval!(YearMonth, i32);
impl_data_type_for_interval!(DayTime, i64);
impl_data_type_for_interval!(MonthDayNano, i128);
impl_data_type_for_interval!(DayTime, ArrowIntervalDayTime);
impl_data_type_for_interval!(MonthDayNano, ArrowIntervalMonthDayNano);
#[cfg(test)]
mod tests {

View File

@@ -411,8 +411,8 @@ impl Value {
Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value()))?,
Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some(v.to_i64())),
Value::IntervalMonthDayNano(v) => ScalarValue::IntervalMonthDayNano(Some(v.to_i128())),
Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
Value::IntervalMonthDayNano(v) => ScalarValue::IntervalMonthDayNano(Some((*v).into())),
Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
Value::Decimal128(d) => {
let (v, p, s) = d.to_scalar_value();
@@ -852,6 +852,7 @@ impl ListValue {
Ok(ScalarValue::List(ScalarValue::new_list(
&vs,
&self.datatype.as_arrow_type(),
true,
)))
}
@@ -964,10 +965,10 @@ impl TryFrom<ScalarValue> for Value {
.map(|x| Value::IntervalYearMonth(IntervalYearMonth::from_i32(x)))
.unwrap_or(Value::Null),
ScalarValue::IntervalDayTime(t) => t
.map(|x| Value::IntervalDayTime(IntervalDayTime::from_i64(x)))
.map(|x| Value::IntervalDayTime(IntervalDayTime::from(x)))
.unwrap_or(Value::Null),
ScalarValue::IntervalMonthDayNano(t) => t
.map(|x| Value::IntervalMonthDayNano(IntervalMonthDayNano::from_i128(x)))
.map(|x| Value::IntervalMonthDayNano(IntervalMonthDayNano::from(x)))
.unwrap_or(Value::Null),
ScalarValue::DurationSecond(d) => d
.map(|x| Value::Duration(Duration::new(x, TimeUnit::Second)))
@@ -990,7 +991,10 @@ impl TryFrom<ScalarValue> for Value {
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Union(_, _, _)
| ScalarValue::Float16(_) => {
| ScalarValue::Float16(_)
| ScalarValue::Utf8View(_)
| ScalarValue::BinaryView(_)
| ScalarValue::Map(_) => {
return error::UnsupportedArrowTypeSnafu {
arrow_type: v.data_type(),
}
@@ -1733,6 +1737,7 @@ mod tests {
ScalarValue::List(ScalarValue::new_list(
&[ScalarValue::Int32(Some(1)), ScalarValue::Int32(None)],
&ArrowDataType::Int32,
true,
))
.try_into()
.unwrap()
@@ -1742,7 +1747,7 @@ mod tests {
vec![],
ConcreteDataType::list_datatype(ConcreteDataType::uint32_datatype())
)),
ScalarValue::List(ScalarValue::new_list(&[], &ArrowDataType::UInt32))
ScalarValue::List(ScalarValue::new_list(&[], &ArrowDataType::UInt32, true))
.try_into()
.unwrap()
);
@@ -1814,7 +1819,7 @@ mod tests {
);
assert_eq!(
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 1)),
ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano::new(1, 1, 1).to_i128()))
ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano::new(1, 1, 1).into()))
.try_into()
.unwrap()
);

View File

@@ -159,6 +159,8 @@ fn equal(lhs: &dyn Vector, rhs: &dyn Vector) -> bool {
#[cfg(test)]
mod tests {
use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano};
use super::*;
use crate::vectors::{
list, DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector,
@@ -236,10 +238,16 @@ mod tests {
1000, 2000, 3000, 4000,
])));
assert_vector_ref_eq(Arc::new(IntervalDayTimeVector::from_values([
1000, 2000, 3000, 4000,
IntervalDayTime::new(1, 1000),
IntervalDayTime::new(1, 2000),
IntervalDayTime::new(1, 3000),
IntervalDayTime::new(1, 4000),
])));
assert_vector_ref_eq(Arc::new(IntervalMonthDayNanoVector::from_values([
1000, 2000, 3000, 4000,
IntervalMonthDayNano::new(1, 1, 1000),
IntervalMonthDayNano::new(1, 1, 2000),
IntervalMonthDayNano::new(1, 1, 3000),
IntervalMonthDayNano::new(1, 1, 4000),
])));
assert_vector_ref_eq(Arc::new(DurationSecondVector::from_values([300, 310])));
assert_vector_ref_eq(Arc::new(DurationMillisecondVector::from_values([300, 310])));
@@ -302,12 +310,24 @@ mod tests {
);
assert_vector_ref_ne(
Arc::new(IntervalDayTimeVector::from_values([1000, 2000])),
Arc::new(IntervalDayTimeVector::from_values([2100, 1200])),
Arc::new(IntervalDayTimeVector::from_values([
IntervalDayTime::new(1, 1000),
IntervalDayTime::new(1, 2000),
])),
Arc::new(IntervalDayTimeVector::from_values([
IntervalDayTime::new(1, 2100),
IntervalDayTime::new(1, 1200),
])),
);
assert_vector_ref_ne(
Arc::new(IntervalMonthDayNanoVector::from_values([1000, 2000])),
Arc::new(IntervalMonthDayNanoVector::from_values([2100, 1200])),
Arc::new(IntervalMonthDayNanoVector::from_values([
IntervalMonthDayNano::new(1, 1, 1000),
IntervalMonthDayNano::new(1, 1, 2000),
])),
Arc::new(IntervalMonthDayNanoVector::from_values([
IntervalMonthDayNano::new(1, 1, 2100),
IntervalMonthDayNano::new(1, 1, 1200),
])),
);
assert_vector_ref_ne(
Arc::new(IntervalYearMonthVector::from_values([1000, 2000])),

View File

@@ -241,7 +241,10 @@ impl Helper {
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
| ScalarValue::Union(_, _, _)
| ScalarValue::Float16(_) => {
| ScalarValue::Float16(_)
| ScalarValue::Utf8View(_)
| ScalarValue::BinaryView(_)
| ScalarValue::Map(_) => {
return error::ConversionSnafu {
from: format!("Unsupported scalar value: {value}"),
}
@@ -415,13 +418,13 @@ mod tests {
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::buffer::Buffer;
use arrow::datatypes::Int32Type;
use arrow::datatypes::{Int32Type, IntervalMonthDayNano};
use arrow_array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeStringArray};
use arrow_schema::DataType;
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
use common_time::{Date, DateTime, Duration, IntervalMonthDayNano};
use common_time::{Date, DateTime, Duration};
use super::*;
use crate::value::Value;
@@ -509,6 +512,7 @@ mod tests {
let value = ScalarValue::List(ScalarValue::new_list(
&[ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(2))],
&ArrowDataType::Int32,
true,
));
let vector = Helper::try_from_scalar_value(value, 3).unwrap();
assert_eq!(
@@ -679,9 +683,11 @@ mod tests {
#[test]
fn test_try_from_scalar_interval_value() {
let vector =
Helper::try_from_scalar_value(ScalarValue::IntervalMonthDayNano(Some(2000)), 3)
.unwrap();
let vector = Helper::try_from_scalar_value(
ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano::new(1, 1, 2000))),
3,
)
.unwrap();
assert_eq!(
ConcreteDataType::interval_month_day_nano_datatype(),
@@ -690,7 +696,7 @@ mod tests {
assert_eq!(3, vector.len());
for i in 0..vector.len() {
assert_eq!(
Value::IntervalMonthDayNano(IntervalMonthDayNano::from_i128(2000)),
Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 2000).into()),
vector.get(i)
);
}

View File

@@ -406,7 +406,7 @@ mod tests {
Int32Array, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray,
};
use arrow::datatypes::DataType as ArrowDataType;
use arrow::datatypes::{DataType as ArrowDataType, IntervalDayTime};
use arrow_array::{
DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
DurationSecondArray, IntervalDayTimeArray, IntervalYearMonthArray,
@@ -650,10 +650,18 @@ mod tests {
vector
);
let array: ArrayRef = Arc::new(IntervalDayTimeArray::from(vec![1000, 2000, 3000]));
let array: ArrayRef = Arc::new(IntervalDayTimeArray::from(vec![
IntervalDayTime::new(1, 1000),
IntervalDayTime::new(1, 2000),
IntervalDayTime::new(1, 3000),
]));
let vector = IntervalDayTimeVector::try_from_arrow_array(array).unwrap();
assert_eq!(
IntervalDayTimeVector::from_values(vec![1000, 2000, 3000]),
IntervalDayTimeVector::from_values(vec![
IntervalDayTime::new(1, 1000),
IntervalDayTime::new(1, 2000),
IntervalDayTime::new(1, 3000),
]),
vector
);

View File

@@ -37,6 +37,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datafusion-substrait.workspace = true
datatypes.workspace = true
enum-as-inner = "0.6.0"
enum_dispatch = "0.3"

View File

@@ -185,12 +185,10 @@ pub fn table_info_value_to_relation_desc(
pub fn from_proto_to_data_type(
column_schema: &api::v1::ColumnSchema,
) -> Result<ConcreteDataType, Error> {
let wrapper = ColumnDataTypeWrapper::try_new(
column_schema.datatype,
column_schema.datatype_extension.clone(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let wrapper =
ColumnDataTypeWrapper::try_new(column_schema.datatype, column_schema.datatype_extension)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cdt = ConcreteDataType::from(wrapper);
Ok(cdt)

View File

@@ -23,6 +23,8 @@ use common_error::ext::BoxedError;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::error::DataFusionError;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
@@ -35,8 +37,6 @@ use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
};
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::aggregate_function::AggregateFunction;
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::utils::merge_schema;
use datafusion_expr::{
BinaryExpr, Expr, Operator, Projection, ScalarUDFImpl, Signature, TypeSignature, Volatility,
@@ -126,6 +126,7 @@ pub async fn sql_to_flow_plan(
Ok(flow_plan)
}
#[derive(Debug)]
struct AvgExpandRule {}
impl AvgExpandRule {
@@ -238,7 +239,7 @@ fn put_aggr_to_proj_analyzer(
fn expand_avg_analyzer(
plan: datafusion_expr::LogicalPlan,
) -> Result<Transformed<datafusion_expr::LogicalPlan>, DataFusionError> {
let mut schema = merge_schema(plan.inputs());
let mut schema = merge_schema(&plan.inputs());
if let datafusion_expr::LogicalPlan::TableScan(ts) = &plan {
let source_schema =
@@ -251,9 +252,10 @@ fn expand_avg_analyzer(
let name_preserver = NamePreserver::new(&plan);
// apply coercion rewrite all expressions in the plan individually
plan.map_expressions(|expr| {
let original_name = name_preserver.save(&expr)?;
expr.rewrite(&mut expr_rewrite)?
.map_data(|expr| original_name.restore(expr))
let original_name = name_preserver.save(&expr);
Ok(expr
.rewrite(&mut expr_rewrite)?
.update_data(|expr| original_name.restore(expr)))
})?
.map_data(|plan| plan.recompute_schema())
}
@@ -280,12 +282,10 @@ impl TreeNodeRewriter for ExpandAvgRewriter<'_> {
fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>, DataFusionError> {
if let Expr::AggregateFunction(aggr_func) = &expr {
if let AggregateFunctionDefinition::BuiltIn(AggregateFunction::Avg) =
&aggr_func.func_def
{
if aggr_func.func.name() == "avg" {
let sum_expr = {
let mut tmp = aggr_func.clone();
tmp.func_def = AggregateFunctionDefinition::BuiltIn(AggregateFunction::Sum);
tmp.func = sum_udaf();
Expr::AggregateFunction(tmp)
};
let sum_cast = {
@@ -299,7 +299,7 @@ impl TreeNodeRewriter for ExpandAvgRewriter<'_> {
let count_expr = {
let mut tmp = aggr_func.clone();
tmp.func_def = AggregateFunctionDefinition::BuiltIn(AggregateFunction::Count);
tmp.func = count_udaf();
Expr::AggregateFunction(tmp)
};
@@ -329,6 +329,7 @@ impl TreeNodeRewriter for ExpandAvgRewriter<'_> {
}
/// expand tumble in aggr expr to tumble_start and tumble_end with column name like `window_start`
#[derive(Debug)]
struct TumbleExpandRule {}
impl TumbleExpandRule {
@@ -528,6 +529,7 @@ impl ScalarUDFImpl for TumbleExpand {
}
/// This rule check all group by exprs, and make sure they are also in select clause in a aggr query
#[derive(Debug)]
struct CheckGroupByRule {}
impl CheckGroupByRule {

View File

@@ -829,18 +829,8 @@ impl BinaryFunc {
arg_types: &[Option<ConcreteDataType>],
) -> Result<(Self, Signature), Error> {
// this `name_to_op` if error simply return a similar message of `unsupported function xxx` so
let op = name_to_op(name).or_else(|err| {
if let datafusion_common::DataFusionError::NotImplemented(msg) = err {
InvalidQuerySnafu {
reason: format!("Unsupported binary function: {}", msg),
}
.fail()
} else {
InvalidQuerySnafu {
reason: format!("Error when parsing binary function: {:?}", err),
}
.fail()
}
let op = name_to_op(name).with_context(|| InvalidQuerySnafu {
reason: format!("Unsupported binary function: {}", name),
})?;
// get first arg type and make sure if both is some, they are the same

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::OnceLock;
use datatypes::prelude::ConcreteDataType;
@@ -21,10 +20,10 @@ use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use serde::{Deserialize, Serialize};
use smallvec::smallvec;
use snafu::{IntoError, OptionExt};
use snafu::OptionExt;
use strum::{EnumIter, IntoEnumIterator};
use crate::error::{DatafusionSnafu, Error, InvalidQuerySnafu};
use crate::error::{Error, InvalidQuerySnafu};
use crate::expr::error::EvalError;
use crate::expr::relation::accum::{Accum, Accumulator};
use crate::expr::signature::{GenericFn, Signature};
@@ -215,28 +214,14 @@ impl AggregateFunc {
}
spec
});
use datafusion_expr::aggregate_function::AggregateFunction as DfAggrFunc;
let df_aggr_func = DfAggrFunc::from_str(name).or_else(|err| {
if let datafusion_common::DataFusionError::NotImplemented(msg) = err {
InvalidQuerySnafu {
reason: format!("Unsupported aggregate function: {}", msg),
}
.fail()
} else {
Err(DatafusionSnafu {
context: "Error when parsing aggregate function",
}
.into_error(err))
}
})?;
let generic_fn = match df_aggr_func {
DfAggrFunc::Max => GenericFn::Max,
DfAggrFunc::Min => GenericFn::Min,
DfAggrFunc::Sum => GenericFn::Sum,
DfAggrFunc::Count => GenericFn::Count,
DfAggrFunc::BoolOr => GenericFn::Any,
DfAggrFunc::BoolAnd => GenericFn::All,
let generic_fn = match name {
"max" => GenericFn::Max,
"min" => GenericFn::Min,
"sum" => GenericFn::Sum,
"count" => GenericFn::Count,
"bool_or" => GenericFn::Any,
"bool_and" => GenericFn::All,
_ => {
return InvalidQuerySnafu {
reason: format!("Unknown aggregate function: {}", name),

View File

@@ -63,6 +63,7 @@ impl TypedPlan {
pub fn projection(self, exprs: Vec<TypedExpr>) -> Result<Self, Error> {
let input_arity = self.schema.typ.column_types.len();
let output_arity = exprs.len();
let (exprs, _expr_typs): (Vec<_>, Vec<_>) = exprs
.into_iter()
.map(|TypedExpr { expr, typ }| (expr, typ))
@@ -72,6 +73,7 @@ impl TypedPlan {
.project(input_arity..input_arity + output_arity)?
.into_safe();
let out_typ = self.schema.apply_mfp(&mfp)?;
let mfp = mfp.mfp;
// special case for mfp to compose when the plan is already mfp
let plan = match self.plan {

View File

@@ -13,10 +13,11 @@
// limitations under the License.
//! Transform Substrait into execution plan
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::Arc;
use common_error::ext::BoxedError;
use datafusion_substrait::extensions::Extensions;
use datatypes::data_type::ConcreteDataType as CDT;
use query::QueryEngine;
use serde::{Deserialize, Serialize};
@@ -92,8 +93,15 @@ impl FunctionExtensions {
self.anchor_to_name.get(anchor)
}
pub fn inner_ref(&self) -> HashMap<u32, &String> {
self.anchor_to_name.iter().map(|(k, v)| (*k, v)).collect()
pub fn to_extensions(&self) -> Extensions {
Extensions {
functions: self
.anchor_to_name
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect(),
..Default::default()
}
}
}
@@ -179,6 +187,7 @@ mod test {
pub fn create_test_ctx() -> FlownodeContext {
let mut tri_map = IdToNameMap::new();
// FIXME(discord9): deprecated, use `numbers_with_ts` instead since this table has no timestamp column
{
let gid = GlobalId::User(0);
let name = [

View File

@@ -14,10 +14,10 @@
use itertools::Itertools;
use snafu::OptionExt;
use substrait_proto::proto;
use substrait_proto::proto::aggregate_function::AggregationInvocation;
use substrait_proto::proto::aggregate_rel::{Grouping, Measure};
use substrait_proto::proto::function_argument::ArgType;
use substrait_proto::proto::{self};
use crate::error::{Error, NotImplementedSnafu, PlanSnafu};
use crate::expr::{
@@ -28,8 +28,11 @@ use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::transform::{substrait_proto, FlownodeContext, FunctionExtensions};
impl TypedExpr {
/// Allow `deprecated` due to the usage of deprecated grouping_expressions on datafusion to substrait side
#[allow(deprecated)]
async fn from_substrait_agg_grouping(
ctx: &mut FlownodeContext,
grouping_expressions: &[proto::Expression],
groupings: &[Grouping],
typ: &RelationDesc,
extensions: &FunctionExtensions,
@@ -38,7 +41,34 @@ impl TypedExpr {
let mut group_expr = vec![];
match groupings.len() {
1 => {
for e in &groupings[0].grouping_expressions {
// handle case when deprecated grouping_expressions is referenced by index is empty
let expressions: Box<dyn Iterator<Item = &proto::Expression> + Send> = if groupings
[0]
.expression_references
.is_empty()
{
Box::new(groupings[0].grouping_expressions.iter())
} else {
if groupings[0]
.expression_references
.iter()
.any(|idx| *idx as usize >= grouping_expressions.len())
{
return PlanSnafu {
reason: format!("Invalid grouping expression reference: {:?} for grouping expr: {:?}",
groupings[0].expression_references,
grouping_expressions
),
}.fail()?;
}
Box::new(
groupings[0]
.expression_references
.iter()
.map(|idx| &grouping_expressions[*idx as usize]),
)
};
for e in expressions {
let x = TypedExpr::from_substrait_rex(e, typ, extensions).await?;
group_expr.push(x);
}
@@ -251,9 +281,14 @@ impl TypedPlan {
return not_impl_err!("Aggregate without an input is not supported");
};
let group_exprs =
TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.schema, extensions)
.await?;
let group_exprs = TypedExpr::from_substrait_agg_grouping(
ctx,
&agg.grouping_expressions,
&agg.groupings,
&input.schema,
extensions,
)
.await?;
let time_index = find_time_index_in_group_exprs(&group_exprs);
@@ -339,7 +374,6 @@ impl TypedPlan {
reduce_plan: ReducePlan::Accumulable(accum_plan),
};
// FIX(discord9): deal with key first
return Ok(TypedPlan {
schema: output_type,
plan,
@@ -349,7 +383,6 @@ impl TypedPlan {
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use std::time::Duration;
use bytes::BytesMut;
@@ -390,7 +423,7 @@ mod test {
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
Some("SUM(abs(numbers_with_ts.number))".to_string()),
Some("sum(abs(numbers_with_ts.number))".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
@@ -454,14 +487,15 @@ mod test {
false,
)])
.into_unnamed(),
extensions: FunctionExtensions {
anchor_to_name: BTreeMap::from([
extensions: FunctionExtensions::from_iter(
[
(0, "tumble_start".to_string()),
(1, "tumble_end".to_string()),
(2, "abs".to_string()),
(3, "sum".to_string()),
]),
},
]
.into_iter(),
),
},
)
.await
@@ -530,7 +564,7 @@ mod test {
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
Some("abs(SUM(numbers_with_ts.number))".to_string()),
Some("abs(sum(numbers_with_ts.number))".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
@@ -615,14 +649,15 @@ mod test {
true,
)])
.into_unnamed(),
extensions: FunctionExtensions {
anchor_to_name: BTreeMap::from([
extensions: FunctionExtensions::from_iter(
[
(0, "abs".to_string()),
(1, "tumble_start".to_string()),
(2, "tumble_end".to_string()),
(3, "sum".to_string()),
]),
},
]
.into_iter(),
),
})
.await
.unwrap(),
@@ -784,8 +819,8 @@ mod test {
.with_key(vec![0, 3])
.with_time_index(Some(2))
.into_named(vec![
Some("numbers_with_ts.number".to_string()),
Some("AVG(numbers_with_ts.number)".to_string()),
Some("number".to_string()),
Some("avg(numbers_with_ts.number)".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
@@ -818,7 +853,7 @@ mod test {
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
Some("SUM(numbers_with_ts.number)".to_string()),
Some("sum(numbers_with_ts.number)".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
@@ -928,7 +963,7 @@ mod test {
.with_key(vec![2])
.with_time_index(Some(1))
.into_named(vec![
Some("SUM(numbers_with_ts.number)".to_string()),
Some("sum(numbers_with_ts.number)".to_string()),
Some("window_start".to_string()),
Some("window_end".to_string()),
]),
@@ -1060,8 +1095,8 @@ mod test {
])
.with_key(vec![1])
.into_named(vec![
Some("AVG(numbers.number)".to_string()),
Some("numbers.number".to_string()),
Some("avg(numbers.number)".to_string()),
Some("number".to_string()),
]),
plan: Plan::Mfp {
input: Box::new(
@@ -1192,7 +1227,7 @@ mod test {
);
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::float64_datatype(), true)])
.into_named(vec![Some("AVG(numbers.number)".to_string())]),
.into_named(vec![Some("avg(numbers.number)".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
@@ -1270,7 +1305,7 @@ mod test {
};
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_named(vec![Some("SUM(numbers.number)".to_string())]),
.into_named(vec![Some("sum(numbers.number)".to_string())]),
plan: Plan::Reduce {
input: Box::new(
Plan::Get {
@@ -1309,6 +1344,61 @@ mod test {
assert_eq!(flow_plan.unwrap(), expected);
}
#[tokio::test]
async fn test_distinct_number() {
let engine = create_test_query_engine();
let sql = "SELECT DISTINCT number FROM numbers";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan)
.await
.unwrap();
let expected = TypedPlan {
schema: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false), // col number
])
.with_key(vec![0])
.into_named(vec![Some("number".to_string())]),
plan: Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
}
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
)
.mfp(MapFilterProject::new(1).into_safe())
.unwrap(),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
.map(vec![ScalarExpr::Column(0)])
.unwrap()
.project(vec![1])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(1)
.project(vec![0])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: vec![],
simple_aggrs: vec![],
distinct_aggrs: vec![],
}),
},
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_sum_group_by() {
let engine = create_test_query_engine();
@@ -1332,8 +1422,8 @@ mod test {
])
.with_key(vec![1])
.into_named(vec![
Some("SUM(numbers.number)".to_string()),
Some("numbers.number".to_string()),
Some("sum(numbers.number)".to_string()),
Some("number".to_string()),
]),
plan: Plan::Mfp {
input: Box::new(
@@ -1410,7 +1500,7 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_named(vec![Some(
"SUM(numbers.number + numbers.number)".to_string(),
"sum(numbers.number + numbers.number)".to_string(),
)]),
plan: Plan::Reduce {
input: Box::new(
@@ -1490,7 +1580,7 @@ mod test {
.with_time_index(Some(1))
.into_named(vec![
Some(
"MAX(numbers_with_ts.number) - MIN(numbers_with_ts.number) / Float64(30)"
"max(numbers_with_ts.number) - min(numbers_with_ts.number) / Float64(30)"
.to_string(),
),
Some("time_window".to_string()),
@@ -1522,7 +1612,7 @@ mod test {
df_scalar_fn: DfScalarFunction::try_from_raw_fn(
RawDfScalarFn {
f: BytesMut::from(
b"\x08\x02\"I\x1aG\nE\x8a\x02?\x08\x03\x12+\n\x17interval-month-day-nano\x12\x10\0\xac#\xfc\x06\0\0\0\0\0\0\0\0\0\0\0\x1a\x06\x12\x04:\x02\x10\x02\x1a\x06\x12\x04:\x02\x10\x02\x98\x03\x03\"\n\x1a\x08\x12\x06\n\x04\x12\x02\x08\x01".as_ref(),
b"\x08\x02\"\x0f\x1a\r\n\x0b\xa2\x02\x08\n\0\x12\x04\x10\x1e \t\"\n\x1a\x08\x12\x06\n\x04\x12\x02\x08\x01".as_ref(),
),
input_schema: RelationType::new(vec![ColumnType::new(
ConcreteDataType::interval_month_day_nano_datatype(),
@@ -1532,15 +1622,13 @@ mod test {
false,
)])
.into_unnamed(),
extensions: FunctionExtensions {
anchor_to_name: BTreeMap::from([
extensions: FunctionExtensions::from_iter([
(0, "subtract".to_string()),
(1, "divide".to_string()),
(2, "date_bin".to_string()),
(3, "max".to_string()),
(4, "min".to_string()),
]),
},
},
)
.await

View File

@@ -18,6 +18,8 @@ use std::sync::Arc;
use common_error::ext::BoxedError;
use common_telemetry::debug;
use datafusion::execution::SessionStateBuilder;
use datafusion::functions::all_default_functions;
use datafusion_physical_expr::PhysicalExpr;
use datatypes::data_type::ConcreteDataType as CDT;
use snafu::{ensure, OptionExt, ResultExt};
@@ -86,16 +88,15 @@ pub(crate) async fn from_scalar_fn_to_df_fn_impl(
};
let schema = input_schema.to_df_schema()?;
let df_expr =
// TODO(discord9): consider coloring everything async....
substrait::df_logical_plan::consumer::from_substrait_rex(
&datafusion::prelude::SessionContext::new(),
&e,
&schema,
&extensions.inner_ref(),
)
.await
;
let df_expr = substrait::df_logical_plan::consumer::from_substrait_rex(
&SessionStateBuilder::new()
.with_scalar_functions(all_default_functions())
.build(),
&e,
&schema,
&extensions.to_extensions(),
)
.await;
let expr = df_expr.context({
DatafusionSnafu {
context: "Failed to convert substrait scalar function to datafusion scalar function",
@@ -551,7 +552,8 @@ mod test {
#[tokio::test]
async fn test_where_and() {
let engine = create_test_query_engine();
let sql = "SELECT number FROM numbers WHERE number >= 1 AND number <= 3 AND number!=2";
let sql =
"SELECT number FROM numbers_with_ts WHERE number >= 1 AND number <= 3 AND number!=2";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
@@ -561,37 +563,53 @@ mod test {
let filter = ScalarExpr::CallVariadic {
func: VariadicFunc::And,
exprs: vec![
ScalarExpr::Column(0).call_binary(
ScalarExpr::Literal(Value::from(1u32), CDT::uint32_datatype()),
ScalarExpr::Column(2).call_binary(
ScalarExpr::Literal(Value::from(1i64), CDT::int64_datatype()),
BinaryFunc::Gte,
),
ScalarExpr::Column(0).call_binary(
ScalarExpr::Literal(Value::from(3u32), CDT::uint32_datatype()),
ScalarExpr::Column(2).call_binary(
ScalarExpr::Literal(Value::from(3i64), CDT::int64_datatype()),
BinaryFunc::Lte,
),
ScalarExpr::Column(0).call_binary(
ScalarExpr::Literal(Value::from(2u32), CDT::uint32_datatype()),
ScalarExpr::Column(2).call_binary(
ScalarExpr::Literal(Value::from(2i64), CDT::int64_datatype()),
BinaryFunc::NotEq,
),
],
};
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("numbers.number".to_string())]),
.into_named(vec![Some("number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(0)),
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::uint32_datatype(),
false,
)])
.into_named(vec![Some("number".to_string())]),
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
])
.into_named(vec![Some("number".to_string()), Some("ts".to_string())]),
),
),
mfp: MapFilterProject::new(1).filter(vec![filter]).unwrap(),
mfp: MapFilterProject::new(2)
.map(vec![
ScalarExpr::CallUnary {
func: UnaryFunc::Cast(CDT::int64_datatype()),
expr: Box::new(ScalarExpr::Column(0)),
},
ScalarExpr::Column(0),
ScalarExpr::Column(3),
])
.unwrap()
.filter(vec![filter])
.unwrap()
.project(vec![4])
.unwrap(),
},
};
assert_eq!(flow_plan.unwrap(), expected);

View File

@@ -16,29 +16,67 @@ use std::array::TryFromSliceError;
use bytes::Bytes;
use common_decimal::Decimal128;
use common_time::{Date, Timestamp};
use common_time::timestamp::TimeUnit;
use common_time::{Date, IntervalMonthDayNano, Timestamp};
use datafusion_common::ScalarValue;
use datatypes::data_type::ConcreteDataType as CDT;
use datatypes::value::Value;
use num_traits::FromBytes;
use snafu::ensure;
use substrait::substrait_proto_df::proto::expression::literal::user_defined::Val;
use substrait::substrait_proto_df::proto::expression::literal::UserDefined;
use snafu::OptionExt;
use substrait::variation_const::{
DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
INTERVAL_DAY_TIME_TYPE_REF, INTERVAL_DAY_TIME_TYPE_URL, INTERVAL_MONTH_DAY_NANO_TYPE_REF,
INTERVAL_MONTH_DAY_NANO_TYPE_URL, INTERVAL_YEAR_MONTH_TYPE_REF, INTERVAL_YEAR_MONTH_TYPE_URL,
TIMESTAMP_MICRO_TYPE_VARIATION_REF, TIMESTAMP_MILLI_TYPE_VARIATION_REF,
TIMESTAMP_NANO_TYPE_VARIATION_REF, TIMESTAMP_SECOND_TYPE_VARIATION_REF,
UNSIGNED_INTEGER_TYPE_VARIATION_REF,
};
use substrait_proto::proto::expression::literal::LiteralType;
use substrait_proto::proto;
use substrait_proto::proto::expression::literal::{LiteralType, PrecisionTimestamp};
use substrait_proto::proto::expression::Literal;
use substrait_proto::proto::r#type::Kind;
use crate::error::{Error, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu};
use crate::transform::substrait_proto;
#[derive(Debug)]
enum TimestampPrecision {
Second = 0,
Millisecond = 3,
Microsecond = 6,
Nanosecond = 9,
}
impl TryFrom<i32> for TimestampPrecision {
type Error = Error;
fn try_from(prec: i32) -> Result<Self, Self::Error> {
match prec {
0 => Ok(Self::Second),
3 => Ok(Self::Millisecond),
6 => Ok(Self::Microsecond),
9 => Ok(Self::Nanosecond),
_ => not_impl_err!("Unsupported precision: {prec}"),
}
}
}
impl TimestampPrecision {
fn to_time_unit(&self) -> TimeUnit {
match self {
Self::Second => TimeUnit::Second,
Self::Millisecond => TimeUnit::Millisecond,
Self::Microsecond => TimeUnit::Microsecond,
Self::Nanosecond => TimeUnit::Nanosecond,
}
}
fn to_cdt(&self) -> CDT {
match self {
Self::Second => CDT::timestamp_second_datatype(),
Self::Millisecond => CDT::timestamp_millisecond_datatype(),
Self::Microsecond => CDT::timestamp_microsecond_datatype(),
Self::Nanosecond => CDT::timestamp_nanosecond_datatype(),
}
}
}
/// TODO(discord9): this is copy from datafusion-substrait since the original function is not public, will be replace once is exported
pub(crate) fn to_substrait_literal(value: &ScalarValue) -> Result<Literal, Error> {
if value.is_null() {
@@ -68,21 +106,34 @@ pub(crate) fn to_substrait_literal(value: &ScalarValue) -> Result<Literal, Error
),
ScalarValue::Float32(Some(f)) => (LiteralType::Fp32(*f), DEFAULT_TYPE_VARIATION_REF),
ScalarValue::Float64(Some(f)) => (LiteralType::Fp64(*f), DEFAULT_TYPE_VARIATION_REF),
// TODO(discord9): deal with timezone
ScalarValue::TimestampSecond(Some(t), _) => (
LiteralType::Timestamp(*t),
TIMESTAMP_SECOND_TYPE_VARIATION_REF,
LiteralType::PrecisionTimestamp(PrecisionTimestamp {
value: *t,
precision: TimestampPrecision::Second as i32,
}),
DEFAULT_TYPE_VARIATION_REF,
),
ScalarValue::TimestampMillisecond(Some(t), _) => (
LiteralType::Timestamp(*t),
TIMESTAMP_MILLI_TYPE_VARIATION_REF,
LiteralType::PrecisionTimestamp(PrecisionTimestamp {
value: *t,
precision: TimestampPrecision::Millisecond as i32,
}),
DEFAULT_TYPE_VARIATION_REF,
),
ScalarValue::TimestampMicrosecond(Some(t), _) => (
LiteralType::Timestamp(*t),
TIMESTAMP_MICRO_TYPE_VARIATION_REF,
LiteralType::PrecisionTimestamp(PrecisionTimestamp {
value: *t,
precision: TimestampPrecision::Microsecond as i32,
}),
DEFAULT_TYPE_VARIATION_REF,
),
ScalarValue::TimestampNanosecond(Some(t), _) => (
LiteralType::Timestamp(*t),
TIMESTAMP_NANO_TYPE_VARIATION_REF,
LiteralType::PrecisionTimestamp(PrecisionTimestamp {
value: *t,
precision: TimestampPrecision::Nanosecond as i32,
}),
DEFAULT_TYPE_VARIATION_REF,
),
ScalarValue::Date32(Some(d)) => (LiteralType::Date(*d), DATE_32_TYPE_VARIATION_REF),
_ => (
@@ -124,25 +175,17 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro
},
Some(LiteralType::Fp32(f)) => (Value::from(*f), CDT::float32_datatype()),
Some(LiteralType::Fp64(f)) => (Value::from(*f), CDT::float64_datatype()),
Some(LiteralType::Timestamp(t)) => match lit.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_second(*t)),
CDT::timestamp_second_datatype(),
),
TIMESTAMP_MILLI_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_millisecond(*t)),
CDT::timestamp_millisecond_datatype(),
),
TIMESTAMP_MICRO_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_microsecond(*t)),
CDT::timestamp_microsecond_datatype(),
),
TIMESTAMP_NANO_TYPE_VARIATION_REF => (
Value::from(Timestamp::new_nanosecond(*t)),
CDT::timestamp_nanosecond_datatype(),
),
others => not_impl_err!("Unknown type variation reference {others}",)?,
},
Some(LiteralType::Timestamp(t)) => (
Value::from(Timestamp::new_microsecond(*t)),
CDT::timestamp_microsecond_datatype(),
),
Some(LiteralType::PrecisionTimestamp(prec_ts)) => {
let (prec, val) = (prec_ts.precision, prec_ts.value);
let prec = TimestampPrecision::try_from(prec)?;
let unit = prec.to_time_unit();
let typ = prec.to_cdt();
(Value::from(Timestamp::new(val, unit)), typ)
}
Some(LiteralType::Date(d)) => (Value::from(Date::new(*d)), CDT::date_datatype()),
Some(LiteralType::String(s)) => (Value::from(s.clone()), CDT::string_datatype()),
Some(LiteralType::Binary(b)) | Some(LiteralType::FixedBinary(b)) => {
@@ -174,30 +217,116 @@ pub(crate) fn from_substrait_literal(lit: &Literal) -> Result<(Value, CDT), Erro
)
}
Some(LiteralType::Null(ntype)) => (Value::Null, from_substrait_type(ntype)?),
Some(LiteralType::IntervalDayToSecond(interval)) => {
let (days, seconds, microseconds) =
(interval.days, interval.seconds, interval.microseconds);
let millis = microseconds / 1000 + seconds * 1000;
let value_interval = common_time::IntervalDayTime::new(days, millis);
Some(LiteralType::IntervalDayToSecond(interval)) => from_interval_day_sec(interval)?,
Some(LiteralType::IntervalYearToMonth(interval)) => from_interval_year_month(interval)?,
Some(LiteralType::IntervalCompound(interval_compound)) => {
let interval_day_time = &interval_compound
.interval_day_to_second
.map(|i| from_interval_day_sec(&i))
.transpose()?;
let interval_year_month = &interval_compound
.interval_year_to_month
.map(|i| from_interval_year_month(&i))
.transpose()?;
let mut compound = IntervalMonthDayNano::new(0, 0, 0);
if let Some(day_sec) = interval_day_time {
let Value::IntervalDayTime(day_time) = day_sec.0 else {
UnexpectedSnafu {
reason: format!("Expect IntervalDayTime, found {:?}", day_sec),
}
.fail()?
};
// 1 day in milliseconds = 24 * 60 * 60 * 1000 = 8.64e7 ms = 8.64e13 ns << 2^63
// so overflow is unexpected
compound.nanoseconds = compound
.nanoseconds
.checked_add(day_time.milliseconds as i64 * 1_000_000)
.with_context(|| UnexpectedSnafu {
reason: format!(
"Overflow when converting interval: {:?}",
interval_compound
),
})?;
compound.days += day_time.days;
}
if let Some(year_month) = interval_year_month {
let Value::IntervalYearMonth(year_month) = year_month.0 else {
UnexpectedSnafu {
reason: format!("Expect IntervalYearMonth, found {:?}", year_month),
}
.fail()?
};
compound.months += year_month.months;
}
(
Value::IntervalDayTime(value_interval),
CDT::interval_day_time_datatype(),
Value::IntervalMonthDayNano(compound),
CDT::interval_month_day_nano_datatype(),
)
}
Some(LiteralType::IntervalYearToMonth(interval)) => (
Value::IntervalYearMonth(common_time::IntervalYearMonth::new(
interval.years * 12 + interval.months,
)),
CDT::interval_year_month_datatype(),
),
Some(LiteralType::UserDefined(user_defined)) => {
from_substrait_user_defined_type(user_defined)?
}
_ => not_impl_err!("unsupported literal_type: {:?}", &lit.literal_type)?,
};
Ok(scalar_value)
}
fn from_interval_day_sec(
interval: &proto::expression::literal::IntervalDayToSecond,
) -> Result<(Value, CDT), Error> {
let (days, seconds, subseconds) = (interval.days, interval.seconds, interval.subseconds);
let millis = if let Some(prec) = interval.precision_mode {
use substrait_proto::proto::expression::literal::interval_day_to_second::PrecisionMode;
match prec {
PrecisionMode::Precision(e) => {
if e >= 3 {
subseconds
/ 10_i64
.checked_pow((e - 3) as _)
.with_context(|| UnexpectedSnafu {
reason: format!(
"Overflow when converting interval: {:?}",
interval
),
})?
} else {
subseconds
* 10_i64
.checked_pow((3 - e) as _)
.with_context(|| UnexpectedSnafu {
reason: format!(
"Overflow when converting interval: {:?}",
interval
),
})?
}
}
PrecisionMode::Microseconds(_) => subseconds / 1000,
}
} else if subseconds == 0 {
0
} else {
not_impl_err!("unsupported subseconds without precision_mode: {subseconds}")?
};
let value_interval = common_time::IntervalDayTime::new(days, seconds * 1000 + millis as i32);
Ok((
Value::IntervalDayTime(value_interval),
CDT::interval_day_time_datatype(),
))
}
fn from_interval_year_month(
interval: &proto::expression::literal::IntervalYearToMonth,
) -> Result<(Value, CDT), Error> {
let value_interval = common_time::IntervalYearMonth::new(interval.years * 12 + interval.months);
Ok((
Value::IntervalYearMonth(value_interval),
CDT::interval_year_month_datatype(),
))
}
fn from_bytes<T: FromBytes>(i: &Bytes) -> Result<T, Error>
where
for<'a> &'a <T as num_traits::FromBytes>::Bytes:
@@ -218,79 +347,6 @@ where
Ok(i)
}
fn from_substrait_user_defined_type(user_defined: &UserDefined) -> Result<(Value, CDT), Error> {
if let UserDefined {
type_reference,
type_parameters: _,
val: Some(Val::Value(val)),
} = user_defined
{
// see https://github.com/apache/datafusion/blob/146b679aa19c7749cc73d0c27440419d6498142b/datafusion/substrait/src/logical_plan/producer.rs#L1957
// for interval type's transform to substrait
let ret = match *type_reference {
INTERVAL_YEAR_MONTH_TYPE_REF => {
ensure!(
val.type_url == INTERVAL_YEAR_MONTH_TYPE_URL,
UnexpectedSnafu {
reason: format!(
"Expect {}, found {} in type_url",
INTERVAL_YEAR_MONTH_TYPE_URL, val.type_url
)
}
);
let i: i32 = from_bytes(&val.value)?;
let value_interval = common_time::IntervalYearMonth::new(i);
(
Value::IntervalYearMonth(value_interval),
CDT::interval_year_month_datatype(),
)
}
INTERVAL_MONTH_DAY_NANO_TYPE_REF => {
ensure!(
val.type_url == INTERVAL_MONTH_DAY_NANO_TYPE_URL,
UnexpectedSnafu {
reason: format!(
"Expect {}, found {} in type_url",
INTERVAL_MONTH_DAY_NANO_TYPE_URL, val.type_url
)
}
);
// TODO(yingwen): Datafusion may change the representation of the interval type.
let i: i128 = from_bytes(&val.value)?;
let (months, days, nsecs) = ((i >> 96) as i32, (i >> 64) as i32, i as i64);
let value_interval = common_time::IntervalMonthDayNano::new(months, days, nsecs);
(
Value::IntervalMonthDayNano(value_interval),
CDT::interval_month_day_nano_datatype(),
)
}
INTERVAL_DAY_TIME_TYPE_REF => {
ensure!(
val.type_url == INTERVAL_DAY_TIME_TYPE_URL,
UnexpectedSnafu {
reason: format!(
"Expect {}, found {} in type_url",
INTERVAL_DAY_TIME_TYPE_URL, val.type_url
)
}
);
// TODO(yingwen): Datafusion may change the representation of the interval type.
let i: i64 = from_bytes(&val.value)?;
let (days, millis) = ((i >> 32) as i32, i as i32);
let value_interval = common_time::IntervalDayTime::new(days, millis);
(
Value::IntervalDayTime(value_interval),
CDT::interval_day_time_datatype(),
)
}
_ => return not_impl_err!("unsupported user defined type: {:?}", user_defined)?,
};
Ok(ret)
} else {
not_impl_err!("Expect val to be Some(...)")
}
}
/// convert a Substrait type into a ConcreteDataType
pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result<CDT, Error> {
if let Some(kind) = &null_type.kind {
@@ -318,13 +374,9 @@ pub fn from_substrait_type(null_type: &substrait_proto::proto::Type) -> Result<C
},
Kind::Fp32(_) => Ok(CDT::float32_datatype()),
Kind::Fp64(_) => Ok(CDT::float64_datatype()),
Kind::Timestamp(ts) => match ts.type_variation_reference {
TIMESTAMP_SECOND_TYPE_VARIATION_REF => Ok(CDT::timestamp_second_datatype()),
TIMESTAMP_MILLI_TYPE_VARIATION_REF => Ok(CDT::timestamp_millisecond_datatype()),
TIMESTAMP_MICRO_TYPE_VARIATION_REF => Ok(CDT::timestamp_microsecond_datatype()),
TIMESTAMP_NANO_TYPE_VARIATION_REF => Ok(CDT::timestamp_nanosecond_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),
},
Kind::PrecisionTimestamp(ts) => {
Ok(TimestampPrecision::try_from(ts.precision)?.to_cdt())
}
Kind::Date(date) => match date.type_variation_reference {
DATE_32_TYPE_VARIATION_REF | DATE_64_TYPE_VARIATION_REF => Ok(CDT::date_datatype()),
v => not_impl_err!("Unsupported Substrait type variation {v} of type {kind:?}"),

View File

@@ -83,6 +83,7 @@ impl TypedPlan {
// because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value
// function to correctly transform it, and late rewrite it
// TODO(discord9): this logic is obsoleted since now expand happens in datafusion optimizer
let schema_before_expand = {
let input_schema = input.schema.clone();
let auto_columns: HashSet<usize> =
@@ -176,6 +177,7 @@ impl TypedPlan {
}
.fail()?,
};
let table = ctx.table(&table_reference).await?;
let get_table = Plan::Get {
id: crate::expr::Id::Global(table.0),
@@ -252,7 +254,7 @@ mod test {
let expected = TypedPlan {
schema: RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)])
.into_named(vec![Some("numbers.number".to_string())]),
.into_named(vec![Some("number".to_string())]),
plan: Plan::Mfp {
input: Box::new(
Plan::Get {

View File

@@ -50,7 +50,7 @@ impl BloomFilterApplier {
let deduped_locs = locs
.iter()
.dedup()
.map(|i| self.meta.bloom_filter_locs[*i as usize].clone())
.map(|i| self.meta.bloom_filter_locs[*i as usize])
.collect::<Vec<_>>();
let bfs = self.reader.bloom_filter_vec(&deduped_locs).await?;

View File

@@ -17,6 +17,7 @@ local-ip-address.workspace = true
[dependencies]
api.workspace = true
async-trait = "0.1"
bytes.workspace = true
chrono.workspace = true
clap.workspace = true
client.workspace = true
@@ -43,9 +44,10 @@ derive_builder.workspace = true
etcd-client.workspace = true
futures.workspace = true
h2 = "0.3"
http-body = "0.4"
http-body-util = "0.1"
humantime.workspace = true
humantime-serde.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
itertools.workspace = true
lazy_static.workspace = true
once_cell.workspace = true

View File

@@ -25,6 +25,7 @@ use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use hyper_util::rt::TokioIo;
use tonic::codec::CompressionEncoding;
use tower::service_fn;
@@ -138,7 +139,7 @@ pub async fn mock(
async move {
if let Some(client) = client {
Ok(client)
Ok(TokioIo::new(client))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,

View File

@@ -26,6 +26,7 @@ pub mod mock {
use common_meta::peer::Peer;
use common_runtime::runtime::BuilderBuild;
use common_runtime::{Builder as RuntimeBuilder, Runtime};
use hyper_util::rt::TokioIo;
use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler};
use tokio::sync::mpsc;
use tonic::codec::CompressionEncoding;
@@ -77,7 +78,7 @@ pub mod mock {
datanode.addr.clone(),
service_fn(move |_| {
let client = client.take().unwrap();
async move { Ok::<_, Error>(client) }
async move { Ok::<_, Error>(TokioIo::new(client)) }
}),
)
.unwrap();

View File

@@ -24,6 +24,8 @@ use std::convert::Infallible;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use tonic::body::BoxBody;
use tonic::codegen::{empty_body, http, BoxFuture, Service};
use tonic::server::NamedService;
@@ -190,10 +192,12 @@ fn check_path(path: &str) {
}
}
/// Returns a [BoxBody] from a string.
/// The implementation follows [empty_body()].
fn boxed(body: String) -> BoxBody {
use http_body::Body;
body.map_err(|_| panic!("")).boxed_unsync()
Full::new(Bytes::from(body))
.map_err(|err| match err {})
.boxed_unsync()
}
#[cfg(test)]

View File

@@ -120,7 +120,8 @@ fn parquet_offset_index_heap_size(offset_index: &ParquetOffsetIndex) -> usize {
row_group
.iter()
.map(|column| {
column.len() * mem::size_of::<PageLocation>() + mem::size_of_val(column)
column.page_locations.len() * mem::size_of::<PageLocation>()
+ mem::size_of_val(column)
})
.sum::<usize>()
+ mem::size_of_val(row_group)

View File

@@ -411,7 +411,7 @@ mod tests {
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
Some(Duration::from_millis(5)),
Some(Duration::from_millis(10)),
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
@@ -437,7 +437,7 @@ mod tests {
let exist = cache.reader(key).await;
assert!(exist.is_some());
tokio::time::sleep(Duration::from_millis(10)).await;
tokio::time::sleep(Duration::from_millis(15)).await;
cache.memory_index.run_pending_tasks().await;
let non = cache.reader(key).await;
assert!(non.is_none());

View File

@@ -556,7 +556,7 @@ async fn test_region_usage() {
// region is empty now, check manifest size
let region = engine.get_region(region_id).unwrap();
let region_stat = region.region_statistic();
assert_eq!(region_stat.manifest_size, 717);
assert!(region_stat.manifest_size > 0);
// put some rows
let rows = Rows {
@@ -583,12 +583,12 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;
let region_stat = region.region_statistic();
assert!(region_stat.sst_size > 0); // Chief says this assert can ensure the size is counted.
assert!(region_stat.sst_size > 0);
assert_eq!(region_stat.num_rows, 10);
// region total usage
// Some memtables may share items.
assert!(region_stat.estimated_disk_size() >= 4028);
assert!(region_stat.estimated_disk_size() > 3000);
}
#[tokio::test]

View File

@@ -47,32 +47,20 @@ impl<'a> MemtableRowGroupPageFetcher<'a> {
parquet_meta: &'a ParquetMetaData,
bytes: Bytes,
) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
let row_count = metadata.num_rows() as usize;
let page_locations = parquet_meta
.offset_index()
.map(|x| x[row_group_idx].as_slice());
Self {
base: RowGroupBase {
metadata,
page_locations,
row_count,
column_chunks: vec![None; metadata.columns().len()],
// the cached `column_uncompressed_pages` would never be used in Memtable readers.
column_uncompressed_pages: vec![None; metadata.columns().len()],
},
// the cached `column_uncompressed_pages` would never be used in Memtable readers.
base: RowGroupBase::new(parquet_meta, row_group_idx),
bytes,
}
}
/// Fetches column pages from memory file.
pub(crate) fn fetch(&mut self, projection: &ProjectionMask, selection: Option<&RowSelection>) {
if let Some((selection, page_locations)) = selection.zip(self.base.page_locations) {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
// Selection provided.
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, page_locations, selection);
.calc_sparse_read_ranges(projection, offset_index, selection);
if fetch_ranges.is_empty() {
return;
}

View File

@@ -55,15 +55,27 @@ pub struct ProjectionMapper {
column_ids: Vec<ColumnId>,
/// Ids and DataTypes of field columns in the [Batch].
batch_fields: Vec<(ColumnId, ConcreteDataType)>,
/// `true` If the original projection is empty.
is_empty_projection: bool,
}
impl ProjectionMapper {
/// Returns a new mapper with projection.
/// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
/// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
/// empty `RecordBatch` and only use its row count in this query.
pub fn new(
metadata: &RegionMetadataRef,
projection: impl Iterator<Item = usize>,
) -> Result<ProjectionMapper> {
let projection: Vec<_> = projection.collect();
let mut projection: Vec<_> = projection.collect();
// If the original projection is empty.
let is_empty_projection = projection.is_empty();
if is_empty_projection {
// If the projection is empty, we still read the time index column.
projection.push(metadata.time_index_column_pos());
}
let mut column_schemas = Vec::with_capacity(projection.len());
let mut column_ids = Vec::with_capacity(projection.len());
for idx in &projection {
@@ -81,6 +93,21 @@ impl ProjectionMapper {
column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
}
let codec = DensePrimaryKeyCodec::new(metadata);
if is_empty_projection {
// If projection is empty, we don't output any column.
return Ok(ProjectionMapper {
metadata: metadata.clone(),
batch_indices: vec![],
has_tags: false,
codec,
output_schema: Arc::new(Schema::new(vec![])),
column_ids,
batch_fields: vec![],
is_empty_projection,
});
}
// Safety: Columns come from existing schema.
let output_schema = Arc::new(Schema::new(column_schemas));
// Get fields in each batch.
@@ -127,6 +154,7 @@ impl ProjectionMapper {
output_schema,
column_ids,
batch_fields,
is_empty_projection,
})
}
@@ -140,7 +168,8 @@ impl ProjectionMapper {
&self.metadata
}
/// Returns ids of projected columns.
/// Returns ids of projected columns that we need to read
/// from memtables and SSTs.
pub(crate) fn column_ids(&self) -> &[ColumnId] {
&self.column_ids
}
@@ -151,6 +180,8 @@ impl ProjectionMapper {
}
/// Returns the schema of converted [RecordBatch].
/// This is the schema that the stream will output. This schema may contain
/// less columns than [ProjectionMapper::column_ids()].
pub(crate) fn output_schema(&self) -> SchemaRef {
self.output_schema.clone()
}
@@ -168,6 +199,10 @@ impl ProjectionMapper {
batch: &Batch,
cache_strategy: &CacheStrategy,
) -> common_recordbatch::error::Result<RecordBatch> {
if self.is_empty_projection {
return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
}
debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
debug_assert!(self
.batch_fields
@@ -411,4 +446,30 @@ mod tests {
+----+----+";
assert_eq!(expect, print_record_batch(record_batch));
}
#[test]
fn test_projection_mapper_empty_projection() {
let metadata = Arc::new(
TestRegionMetadataBuilder::default()
.num_tags(2)
.num_fields(2)
.build(),
);
// Empty projection
let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap();
assert_eq!([0], mapper.column_ids()); // Should still read the time index column
assert!(mapper.batch_fields().is_empty());
assert!(!mapper.has_tags);
assert!(mapper.batch_indices.is_empty());
assert!(mapper.output_schema().is_empty());
assert!(mapper.is_empty_projection);
let batch = new_batch(0, &[1, 2], &[], 3);
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let record_batch = mapper.convert(&batch, &cache).unwrap();
assert_eq!(3, record_batch.num_rows());
assert_eq!(0, record_batch.num_columns());
assert!(record_batch.schema.is_empty());
}
}

View File

@@ -155,7 +155,7 @@ impl WriteRequest {
ensure!(
is_column_type_value_eq(
input_col.datatype,
input_col.datatype_extension.clone(),
input_col.datatype_extension,
&column.column_schema.data_type
),
InvalidRequestSnafu {

View File

@@ -215,22 +215,18 @@ impl FileRangeContext {
let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
file_path: self.reader_builder.file_path(),
})?;
if stats.has_min_max_set() {
stats
.min_bytes()
.try_into()
.map(i32::from_le_bytes)
.map(|min_op_type| min_op_type == OpType::Delete as i32)
.ok()
.context(DecodeStatsSnafu {
file_path: self.reader_builder.file_path(),
})
} else {
DecodeStatsSnafu {
stats
.min_bytes_opt()
.context(StatsNotPresentSnafu {
file_path: self.reader_builder.file_path(),
}
.fail()
}
})?
.try_into()
.map(i32::from_le_bytes)
.map(|min_op_type| min_op_type == OpType::Delete as i32)
.ok()
.context(DecodeStatsSnafu {
file_path: self.reader_builder.file_path(),
})
}
}

View File

@@ -410,9 +410,6 @@ impl ReadFormat {
.borrow()
.column(self.primary_key_position())
.statistics()?;
if !stats.has_min_max_set() {
return None;
}
match stats {
Statistics::Boolean(_) => None,
Statistics::Int32(_) => None,
@@ -421,7 +418,11 @@ impl ReadFormat {
Statistics::Float(_) => None,
Statistics::Double(_) => None,
Statistics::ByteArray(s) => {
let bytes = if is_min { s.min_bytes() } else { s.max_bytes() };
let bytes = if is_min {
s.min_bytes_opt()?
} else {
s.max_bytes_opt()?
};
converter.decode_leftmost(bytes).ok()?
}
Statistics::FixedLenByteArray(_) => None,
@@ -460,39 +461,40 @@ impl ReadFormat {
.iter()
.map(|meta| {
let stats = meta.borrow().column(column_index).statistics()?;
if !stats.has_min_max_set() {
return None;
}
match stats {
Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
*s.min()
*s.min_opt()?
} else {
*s.max()
*s.max_opt()?
}))),
Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
*s.min()
*s.min_opt()?
} else {
*s.max()
*s.max_opt()?
}))),
Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
*s.min()
*s.min_opt()?
} else {
*s.max()
*s.max_opt()?
}))),
Statistics::Int96(_) => None,
Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
*s.min()
*s.min_opt()?
} else {
*s.max()
*s.max_opt()?
}))),
Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
*s.min()
*s.min_opt()?
} else {
*s.max()
*s.max_opt()?
}))),
Statistics::ByteArray(s) => {
let bytes = if is_min { s.min_bytes() } else { s.max_bytes() };
let bytes = if is_min {
s.min_bytes_opt()?
} else {
s.max_bytes_opt()?
};
let s = String::from_utf8(bytes.to_vec()).ok();
Some(ScalarValue::Utf8(s))
}
@@ -514,7 +516,7 @@ impl ReadFormat {
let values = row_groups.iter().map(|meta| {
let col = meta.borrow().column(column_index);
let stat = col.statistics()?;
Some(stat.null_count())
stat.null_count_opt()
});
Some(Arc::new(UInt64Array::from_iter(values)))
}
@@ -595,31 +597,30 @@ pub(crate) fn parquet_row_group_time_range(
let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
let stats = row_group_meta.column(time_index_pos).statistics()?;
if stats.has_min_max_set() {
// The physical type for the timestamp should be i64.
let (min, max) = match stats {
Statistics::Int64(value_stats) => (*value_stats.min(), *value_stats.max()),
Statistics::Int32(_)
| Statistics::Boolean(_)
| Statistics::Int96(_)
| Statistics::Float(_)
| Statistics::Double(_)
| Statistics::ByteArray(_)
| Statistics::FixedLenByteArray(_) => return None,
};
// The physical type for the timestamp should be i64.
let (min, max) = match stats {
Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
Statistics::Int32(_)
| Statistics::Boolean(_)
| Statistics::Int96(_)
| Statistics::Float(_)
| Statistics::Double(_)
| Statistics::ByteArray(_)
| Statistics::FixedLenByteArray(_) => {
common_telemetry::warn!(
"Invalid statistics {:?} for time index in parquet in {}",
stats,
file_meta.file_id
);
return None;
}
};
debug_assert!(
min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value()
);
debug_assert!(
max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value()
);
let unit = file_meta.time_range.0.unit();
debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
let unit = file_meta.time_range.0.unit();
Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
} else {
None
}
Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
}
#[cfg(test)]

View File

@@ -13,8 +13,7 @@
// limitations under the License.
use object_store::ObjectStore;
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::file::FOOTER_SIZE;
use snafu::ResultExt;
@@ -96,7 +95,7 @@ impl<'a> MetadataLoader<'a> {
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let metadata_len = decode_footer(&footer).map_err(|e| {
let metadata_len = ParquetMetaDataReader::decode_footer(&footer).map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode footer, {e}"),
@@ -118,14 +117,16 @@ impl<'a> MetadataLoader<'a> {
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
// The whole metadata is in the first read
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
let metadata = decode_metadata(&buffer[metadata_start..buffer_len - FOOTER_SIZE])
.map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode metadata, {e}"),
}
.build()
})?;
let metadata = ParquetMetaDataReader::decode_metadata(
&buffer[metadata_start..buffer_len - FOOTER_SIZE],
)
.map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode metadata, {e}"),
}
.build()
})?;
Ok(metadata)
} else {
// The metadata is out of buffer, need to make a second read
@@ -137,7 +138,7 @@ impl<'a> MetadataLoader<'a> {
.context(error::OpenDalSnafu)?
.to_vec();
let metadata = decode_metadata(&data).map_err(|e| {
let metadata = ParquetMetaDataReader::decode_metadata(&data).map_err(|e| {
error::InvalidParquetSnafu {
file: path,
reason: format!("failed to decode metadata, {e}"),

View File

@@ -24,10 +24,10 @@ use parquet::arrow::ProjectionMask;
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::properties::DEFAULT_PAGE_SIZE;
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use parquet::format::PageLocation;
use store_api::storage::RegionId;
use tokio::task::yield_now;
@@ -39,31 +39,33 @@ use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::page_reader::RowGroupCachedReader;
pub(crate) struct RowGroupBase<'a> {
pub(crate) metadata: &'a RowGroupMetaData,
pub(crate) page_locations: Option<&'a [Vec<PageLocation>]>,
metadata: &'a RowGroupMetaData,
pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
/// Compressed page of each column.
pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
pub(crate) row_count: usize,
/// Row group level cached pages for each column.
///
/// These pages are uncompressed pages of a row group.
/// `column_uncompressed_pages.len()` equals to `column_chunks.len()`.
pub(crate) column_uncompressed_pages: Vec<Option<Arc<PageValue>>>,
column_uncompressed_pages: Vec<Option<Arc<PageValue>>>,
}
impl<'a> RowGroupBase<'a> {
pub(crate) fn new(parquet_meta: &'a ParquetMetaData, row_group_idx: usize) -> Self {
let metadata = parquet_meta.row_group(row_group_idx);
// `page_locations` is always `None` if we don't set
// `offset_index` is always `None` if we don't set
// [with_page_index()](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index)
// to `true`.
let page_locations = parquet_meta
let offset_index = parquet_meta
.offset_index()
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|x| x[row_group_idx].as_slice());
Self {
metadata,
page_locations,
offset_index,
column_chunks: vec![None; metadata.columns().len()],
row_count: metadata.num_rows() as usize,
column_uncompressed_pages: vec![None; metadata.columns().len()],
@@ -73,7 +75,7 @@ impl<'a> RowGroupBase<'a> {
pub(crate) fn calc_sparse_read_ranges(
&self,
projection: &ProjectionMask,
page_locations: &[Vec<PageLocation>],
offset_index: &[OffsetIndexMetaData],
selection: &RowSelection,
) -> (Vec<Range<u64>>, Vec<Vec<usize>>) {
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
@@ -90,7 +92,7 @@ impl<'a> RowGroupBase<'a> {
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start..first.offset as u64);
}
@@ -99,7 +101,7 @@ impl<'a> RowGroupBase<'a> {
ranges.extend(
selection
.scan_ranges(&page_locations[idx])
.scan_ranges(&offset_index[idx].page_locations)
.iter()
.map(|range| range.start as u64..range.end as u64),
);
@@ -203,7 +205,11 @@ impl<'a> RowGroupBase<'a> {
)))
}
Some(data) => {
let page_locations = self.page_locations.map(|index| index[col_idx].clone());
let page_locations = self
.offset_index
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|index| index[col_idx].page_locations.clone());
SerializedPageReader::new(
data.clone(),
self.metadata.column(col_idx),
@@ -245,13 +251,13 @@ impl<'a> InMemoryRowGroup<'a> {
object_store: ObjectStore,
) -> Self {
Self {
base: RowGroupBase::new(parquet_meta, row_group_idx),
region_id,
file_id,
row_group_idx,
cache_strategy,
file_path,
object_store,
base: RowGroupBase::new(parquet_meta, row_group_idx),
}
}
@@ -261,10 +267,10 @@ impl<'a> InMemoryRowGroup<'a> {
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
if let Some((selection, page_locations)) = selection.zip(self.base.page_locations) {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, page_locations, selection);
.calc_sparse_read_ranges(projection, offset_index, selection);
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
// Assign sparse chunk data to base.

View File

@@ -112,7 +112,7 @@ fn validate_rows(rows: &Option<Rows>) -> Result<()> {
for (col_idx, schema) in rows.schema.iter().enumerate() {
let column_type =
ColumnDataTypeWrapper::try_new(schema.datatype, schema.datatype_extension.clone())
ColumnDataTypeWrapper::try_new(schema.datatype, schema.datatype_extension)
.context(ColumnDataTypeSnafu)?
.into();
@@ -172,7 +172,7 @@ pub fn columns_to_rows(columns: Vec<Column>, row_count: u32) -> Result<Rows> {
column_name: column.column_name.clone(),
datatype: column.datatype,
semantic_type: column.semantic_type,
datatype_extension: column.datatype_extension.clone(),
datatype_extension: column.datatype_extension,
options: column.options.clone(),
};
schema.push(column_schema);

View File

@@ -109,7 +109,7 @@ impl Requester {
.map(|partition| {
RegionRequestBody::Compact(CompactRequest {
region_id: partition.id.into(),
options: Some(request.compact_options.clone()),
options: Some(request.compact_options),
})
})
.collect();

View File

@@ -28,7 +28,7 @@ use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{Expr, FunctionArg, FunctionArgExpr, Value as SqlValue};
use sql::ast::{Expr, FunctionArg, FunctionArgExpr, FunctionArguments, Value as SqlValue};
use sql::statements::admin::Admin;
use sql::statements::sql_value_to_value;
@@ -53,7 +53,13 @@ impl StatementExecutor {
.context(error::AdminFunctionNotFoundSnafu { name: func_name })?;
let signature = admin_func.signature();
let arg_values = func
let FunctionArguments::List(args) = &func.args else {
return error::BuildAdminFunctionArgsSnafu {
msg: format!("unsupported function args {}", func.args),
}
.fail();
};
let arg_values = args
.args
.iter()
.map(|arg| {
@@ -165,6 +171,8 @@ fn args_to_vector(
}
.fail()
}
TypeSignature::NullAry => Ok(vec![]),
}
}

View File

@@ -1477,7 +1477,7 @@ fn find_partition_entries(
for column in column_defs {
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension)
.context(ColumnDataTypeSnafu)?,
);
column_name_and_type.insert(column_name, data_type);

View File

@@ -327,7 +327,7 @@ fn create_partitions_from_region_routes(
fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<HashSet<RegionNumber>> {
match filter {
Expr::BinaryExpr(BinaryExpr { left, op, right }) if op.is_comparison_operator() => {
Expr::BinaryExpr(BinaryExpr { left, op, right }) if op.supports_propagation() => {
let column_op_value = match (left.as_ref(), right.as_ref()) {
(Expr::Column(c), Expr::Literal(v)) => Some((&c.name, *op, v)),
(Expr::Literal(v), Expr::Column(c)) => Some((

View File

@@ -172,6 +172,25 @@ impl UserDefinedLogicalNodeCore for EmptyMetric {
}
}
impl PartialOrd for EmptyMetric {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// Compare fields in order excluding schema fields
match self.start.partial_cmp(&other.start) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.end.partial_cmp(&other.end) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.interval.partial_cmp(&other.interval) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
self.expr.partial_cmp(&other.expr)
}
}
#[derive(Debug, Clone)]
pub struct EmptyMetricExec {
start: Millisecond,
@@ -247,6 +266,10 @@ impl ExecutionPlan for EmptyMetricExec {
column_statistics: Statistics::unknown_column(&self.schema()),
})
}
fn name(&self) -> &str {
"EmptyMetricExec"
}
}
impl DisplayAs for EmptyMetricExec {

View File

@@ -29,7 +29,7 @@ use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef, Statistics};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::TaskContext;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
use datafusion::physical_expr::{EquivalenceProperties, LexRequirement, PhysicalSortRequirement};
use datafusion::physical_plan::expressions::{CastExpr as PhyCast, Column as PhyColumn};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -218,6 +218,29 @@ impl HistogramFold {
}
}
impl PartialOrd for HistogramFold {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// Compare fields in order excluding output_schema
match self.le_column.partial_cmp(&other.le_column) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.ts_column.partial_cmp(&other.ts_column) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.input.partial_cmp(&other.input) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.field_column.partial_cmp(&other.field_column) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
self.quantile.partial_cmp(&other.quantile)
}
}
#[derive(Debug)]
pub struct HistogramFoldExec {
/// Index for `le` column in the schema of input.
@@ -241,7 +264,7 @@ impl ExecutionPlan for HistogramFoldExec {
&self.properties
}
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
let mut cols = self
.tag_col_exprs()
.into_iter()
@@ -274,7 +297,7 @@ impl ExecutionPlan for HistogramFoldExec {
}),
});
vec![Some(cols)]
vec![Some(LexRequirement::new(cols))]
}
fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -352,10 +375,14 @@ impl ExecutionPlan for HistogramFoldExec {
column_statistics: vec![
ColumnStatistics::new_unknown();
// plus one more for the removed column by function `convert_schema`
self.schema().all_fields().len() + 1
self.schema().flattened_fields().len() + 1
],
})
}
fn name(&self) -> &str {
"HistogramFoldExec"
}
}
impl HistogramFoldExec {

View File

@@ -46,7 +46,7 @@ use crate::extension_plan::Millisecond;
/// This plan will try to align the input time series, for every timestamp between
/// `start` and `end` with step `interval`. Find in the `lookback` range if data
/// is missing at the given timestamp.
#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
pub struct InstantManipulate {
start: Millisecond,
end: Millisecond,
@@ -290,10 +290,14 @@ impl ExecutionPlan for InstantManipulateExec {
// TODO(ruihang): support this column statistics
column_statistics: vec![
ColumnStatistics::new_unknown();
self.schema().all_fields().len()
self.schema().flattened_fields().len()
],
})
}
fn name(&self) -> &str {
"InstantManipulateExec"
}
}
impl DisplayAs for InstantManipulateExec {

View File

@@ -47,7 +47,7 @@ use crate::extension_plan::Millisecond;
/// - bias sample's timestamp by offset
/// - sort the record batch based on timestamp column
/// - remove NaN values (optional)
#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
pub struct SeriesNormalize {
offset: Millisecond,
time_index_column_name: String,
@@ -229,6 +229,10 @@ impl ExecutionPlan for SeriesNormalizeExec {
fn statistics(&self) -> DataFusionResult<Statistics> {
self.input.statistics()
}
fn name(&self) -> &str {
"SeriesNormalizeExec"
}
}
impl DisplayAs for SeriesNormalizeExec {

View File

@@ -206,6 +206,37 @@ impl RangeManipulate {
}
}
impl PartialOrd for RangeManipulate {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// Compare fields in order excluding output_schema
match self.start.partial_cmp(&other.start) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.end.partial_cmp(&other.end) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.interval.partial_cmp(&other.interval) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.range.partial_cmp(&other.range) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.time_index.partial_cmp(&other.time_index) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
match self.field_columns.partial_cmp(&other.field_columns) {
Some(core::cmp::Ordering::Equal) => {}
ord => return ord,
}
self.input.partial_cmp(&other.input)
}
}
impl UserDefinedLogicalNodeCore for RangeManipulate {
fn name(&self) -> &str {
Self::name()
@@ -385,6 +416,10 @@ impl ExecutionPlan for RangeManipulateExec {
column_statistics: Statistics::unknown_column(&self.schema()),
})
}
fn name(&self) -> &str {
"RangeManipulateExec"
}
}
impl DisplayAs for RangeManipulateExec {

Some files were not shown because too many files have changed in this diff Show More