Compare commits

...

7 Commits

Author SHA1 Message Date
Ning Sun
8853e08a7d chore: use pinned prost 0.14.1 2026-01-15 16:58:24 +08:00
Ning Sun
9cba14f904 chore: update otel librarires 2026-01-15 16:12:34 +08:00
Ning Sun
09ba24b7a9 chore: update otel-arrow 2026-01-15 15:46:40 +08:00
LFC
e64c31e59a chore: upgrade DataFusion family (#7558)
* chore: upgrade DataFusion family

Signed-off-by: luofucong <luofc@foxmail.com>

* use main proto

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
2026-01-14 14:02:31 +00:00
Ruihang Xia
a5cb0116a2 perf: avoid boundary checks on accessing array items (#7570)
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-14 12:56:39 +00:00
Ruihang Xia
170f94fc08 feat: enable pruning for manipulate plans (#7565)
* feat: enable pruning for manipulate plans

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply to other plans and add sqlness case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix scalar manipulate and histogram fold for missing some columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't drop every columns

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unrelated part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-14 08:32:51 +00:00
Yingwen
1c9aa59317 style: remove unused imports (#7567)
* style: remove unused imports

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: import only in test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2026-01-14 07:59:40 +00:00
113 changed files with 2816 additions and 1639 deletions

1596
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -100,13 +100,13 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "56.2", features = ["prettyprint"] }
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "56.2"
arrow-cast = "56.2"
arrow-flight = "56.2"
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "56.2", features = ["serde"] }
arrow = { version = "57.0", features = ["prettyprint"] }
arrow-array = { version = "57.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "57.0"
arrow-cast = "57.0"
arrow-flight = "57.0"
arrow-ipc = { version = "57.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "57.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
@@ -120,38 +120,39 @@ bitflags = "2.4.1"
bytemuck = "1.12"
bytes = { version = "1.7", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = { version = "0.10.1", features = ["case-insensitive"] }
chrono-tz = { version = "0.10", features = ["case-insensitive"] }
clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
const_format = "0.2"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = "50"
datafusion-common = "50"
datafusion-expr = "50"
datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = "0.5"
datafusion-pg-catalog = "0.12.3"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
datafusion-substrait = "50"
datafusion = "51.0"
datafusion-common = "51.0"
datafusion-datasource = "51.0"
datafusion-expr = "51.0"
datafusion-functions = "51.0"
datafusion-functions-aggregate-common = "51.0"
datafusion-optimizer = "51.0"
datafusion-orc = { git = "https://github.com/GreptimeTeam/datafusion-orc.git", rev = "35f2e04bf81f2ab7b6f86c0450d6a77b7098d43e" }
datafusion-pg-catalog = "0.13"
datafusion-physical-expr = "51.0"
datafusion-physical-plan = "51.0"
datafusion-sql = "51.0"
datafusion-substrait = "51.0"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
derive_more = { version = "2.1", features = ["full"] }
dotenv = "0.15"
either = "1.15"
etcd-client = { version = "0.16.1", features = [
etcd-client = { version = "0.17", features = [
"tls",
"tls-roots",
] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "58aeee49267fb1eafa6f9123f9d0c47dd0f62722" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1353b0ada9e17890c7ba0e402ba29b2b57816ff1" }
hex = "0.4"
http = "1"
humantime = "2.1"
@@ -162,7 +163,7 @@ itertools = "0.14"
jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a39f3ff08909d60f718639cfba3875", default-features = false }
lazy_static = "1.4"
local-ip-address = "0.6"
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "3b7cd33234358b18ece977bf689dc6fb760f29ab" }
loki-proto = { git = "https://github.com/GreptimeTeam/loki-proto.git", rev = "f69c8924c4babe516373e26a4118be82d976629c" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.13"
moka = "0.12"
@@ -172,7 +173,7 @@ notify = "8.0"
num_cpus = "1.16"
object_store_opendal = "0.54"
once_cell = "1.18"
opentelemetry-proto = { version = "0.30", features = [
opentelemetry-proto = { version = "0.31", features = [
"gen-tonic",
"metrics",
"trace",
@@ -180,18 +181,18 @@ opentelemetry-proto = { version = "0.30", features = [
"logs",
] }
ordered-float = { version = "4.3", features = ["serde"] }
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "2d64b7c0fa95642028a8205b36fe9ea0b023ec59", features = [
otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "452821e455b16e9a397a09d299340e197eb91571", features = [
"server",
] }
parking_lot = "0.12"
parquet = { version = "56.2", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.7.1", features = ["ser"] }
prost = { version = "0.13", features = ["no-recursion-limit"] }
prost-types = "0.13"
prost = { version = "=0.14.1", features = ["no-recursion-limit"] }
prost-types = "=0.14.1"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.9"
ratelimit = "0.10"
@@ -223,7 +224,7 @@ simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { version = "0.58.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlparser = { version = "0.59.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlx = { version = "0.8", default-features = false, features = ["any", "macros", "json", "runtime-tokio-rustls"] }
strum = { version = "0.27", features = ["derive"] }
sysinfo = "0.33"
@@ -234,12 +235,12 @@ tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.13", features = ["tls-ring", "gzip", "zstd"] }
tonic = { version = "0.14", features = ["tls-ring", "gzip", "zstd"] }
tower = "0.5"
tower-http = "0.6"
tracing = "0.1"
tracing-appender = "0.2"
tracing-opentelemetry = "0.31.0"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2"
uuid = { version = "1.17", features = ["serde", "v4", "fast-rng"] }
@@ -322,19 +323,20 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "a0ce2bc6eb3e804532932f39833c32432f5c9a39" } # branch = "v0.58.x"
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-pg-catalog = { git = "https://github.com/GreptimeTeam/datafusion-postgres.git", rev = "74ac8e2806be6de91ff192b97f64735392539d16" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7143b2fc4492a7970774583ed0997a459f3e5c05" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "d7d95a44889e099e32d78e9bad9bc00598faef28" } # on branch v0.59.x
[profile.release]
debug = 1

View File

@@ -35,6 +35,7 @@ use mito2::sst::parquet::reader::ParquetReaderBuilder;
use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
use mito2::worker::write_cache_from_config;
use object_store::ObjectStore;
use parquet::file::metadata::{FooterTail, KeyValue};
use regex::Regex;
use snafu::OptionExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
@@ -463,7 +464,6 @@ fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> error::Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
@@ -608,7 +608,7 @@ async fn load_parquet_metadata(
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let footer = FooterTail::try_new(&footer)?;
let metadata_len = footer.metadata_length() as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());

View File

@@ -27,13 +27,14 @@ common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
datafusion.workspace = true
datafusion-datasource.workspace = true
datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
object_store_opendal.workspace = true
orc-rust = { version = "0.6.3", default-features = false, features = ["async"] }
orc-rust = { version = "0.7", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true
regex.workspace = true

View File

@@ -14,7 +14,7 @@
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::parquet::format::FileMetaData;
use parquet::file::metadata::ParquetMetaData;
use crate::error::Result;
@@ -24,5 +24,5 @@ pub trait DfRecordBatchEncoder {
#[async_trait]
pub trait ArrowWriterCloser {
async fn close(mut self) -> Result<FileMetaData>;
async fn close(mut self) -> Result<ParquetMetaData>;
}

View File

@@ -40,7 +40,6 @@ use datafusion::datasource::physical_plan::{
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::datatypes::SchemaRef;
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use object_store_opendal::OpendalStore;
@@ -303,24 +302,20 @@ where
pub async fn file_to_stream(
store: &ObjectStore,
filename: &str,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
projection: Option<Vec<usize>>,
compression_type: CompressionType,
) -> Result<DfSendableRecordBatchStream> {
let df_compression: DfCompressionType = compression_type.into();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection(projection)
.with_file_compression_type(df_compression)
.build();
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection_indices(projection)
.with_file_compression_type(df_compression)
.build();
let store = Arc::new(OpendalStore::new(store.clone()));
let file_opener = file_source

View File

@@ -440,14 +440,11 @@ mod tests {
.await
.unwrap(),
);
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(8192);
let csv_source = CsvSource::new(schema).with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
csv_source.clone(),
None,
compression_type,

View File

@@ -347,14 +347,11 @@ mod tests {
.await
.unwrap(),
);
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(8192);
let json_source = JsonSource::new(schema).with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
json_source.clone(),
None,
compression_type,

View File

@@ -18,15 +18,15 @@ use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
use datafusion::datasource::physical_plan::ParquetFileReaderFactory;
use datafusion::error::Result as DatafusionResult;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema};
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_datasource::PartitionedFile;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use futures::future::BoxFuture;
@@ -100,11 +100,11 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
_partition_index: usize,
file_meta: FileMeta,
partitioned_file: PartitionedFile,
_metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> DatafusionResult<Box<dyn AsyncFileReader + Send>> {
let path = file_meta.location().to_string();
let path = partitioned_file.path().to_string();
let object_store = self.object_store.clone();
Ok(Box::new(LazyParquetFileReader::new(object_store, path)))
@@ -180,7 +180,7 @@ impl DfRecordBatchEncoder for ArrowWriter<SharedBuffer> {
#[async_trait]
impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
async fn close(self) -> Result<FileMetaData> {
async fn close(self) -> Result<ParquetMetaData> {
self.close().context(error::EncodeRecordBatchSnafu)
}
}

View File

@@ -67,14 +67,14 @@ impl Test<'_> {
async fn test_json_opener() {
let store = test_store("/");
let schema = test_basic_schema();
let file_source = Arc::new(JsonSource::new()).with_batch_size(test_util::TEST_BATCH_SIZE);
let file_source = Arc::new(JsonSource::new(schema)).with_batch_size(test_util::TEST_BATCH_SIZE);
let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json")
.display()
.to_string();
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+",
@@ -87,7 +87,7 @@ async fn test_json_opener() {
],
},
Test {
config: scan_config(schema, Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+",
@@ -112,13 +112,11 @@ async fn test_csv_opener() {
.display()
.to_string();
let file_source = CsvSource::new(true, b',', b'"')
.with_batch_size(test_util::TEST_BATCH_SIZE)
.with_schema(schema.clone());
let file_source = CsvSource::new(schema).with_batch_size(test_util::TEST_BATCH_SIZE);
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+---------------------+----------+------------+",
@@ -131,7 +129,7 @@ async fn test_csv_opener() {
],
},
Test {
config: scan_config(schema, Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+---------------------+----------+------------+",
@@ -158,10 +156,10 @@ async fn test_parquet_exec() {
.display()
.to_string();
let parquet_source = ParquetSource::default()
let parquet_source = ParquetSource::new(schema)
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));
let config = scan_config(schema, None, path, Arc::new(parquet_source));
let config = scan_config(None, path, Arc::new(parquet_source));
let exec = DataSourceExec::from_data_source(config);
let ctx = SessionContext::new();
@@ -197,11 +195,11 @@ async fn test_orc_opener() {
let store = test_store("/");
let schema = Arc::new(OrcFormat.infer_schema(&store, path).await.unwrap());
let file_source = Arc::new(OrcSource::default());
let file_source = Arc::new(OrcSource::new(schema.into()));
let tests = [
Test {
config: scan_config(schema.clone(), None, path, file_source.clone()),
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
@@ -216,7 +214,7 @@ async fn test_orc_opener() {
],
},
Test {
config: scan_config(schema.clone(), Some(1), path, file_source.clone()),
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+",

View File

@@ -80,7 +80,6 @@ pub fn csv_basic_schema() -> SchemaRef {
}
pub(crate) fn scan_config(
file_schema: SchemaRef,
limit: Option<usize>,
filename: &str,
file_source: Arc<dyn FileSource>,
@@ -89,7 +88,7 @@ pub(crate) fn scan_config(
let filename = &filename.replace('\\', "/");
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.clone(), 4096)]);
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_group(file_group)
.with_limit(limit)
.build()
@@ -109,7 +108,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let size = store.read(origin_path).await.unwrap().len();
let config = scan_config(schema, None, origin_path, Arc::new(JsonSource::new()));
let config = scan_config(None, origin_path, Arc::new(JsonSource::new(schema)));
let stream = FileStream::new(
&config,
0,
@@ -151,10 +150,8 @@ pub async fn setup_stream_to_csv_test(
let schema = csv_basic_schema();
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(schema, None, origin_path, csv_source.clone());
let csv_source = CsvSource::new(schema).with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(None, origin_path, csv_source.clone());
let size = store.read(origin_path).await.unwrap().len();
let csv_opener = csv_source.create_file_opener(

View File

@@ -104,7 +104,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &ConcreteDataType::numerics().into_iter().map(|dt| { use datatypes::data_type::DataType; dt.as_arrow_type() }).collect::<Vec<_>>()));
}

View File

@@ -331,7 +331,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &vec![ArrowDataType::Utf8]));
}

View File

@@ -145,7 +145,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::OneOf(sigs),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if sigs.len() == 2));
}

View File

@@ -341,6 +341,7 @@ impl AggregateUDFImpl for StateWrapper {
name: acc_args.name,
is_distinct: acc_args.is_distinct,
exprs: acc_args.exprs,
expr_fields: acc_args.expr_fields,
};
self.inner.accumulator(acc_args)?
};

View File

@@ -650,7 +650,7 @@ async fn test_last_value_order_by_udaf() {
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
true
), // ordering field is added to state fields too
Field::new("is_set", DataType::Boolean, true)
Field::new("last_value[last_value_is_set]", DataType::Boolean, true)
]
.into()
),
@@ -735,7 +735,7 @@ async fn test_last_value_order_by_udaf() {
DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
true,
),
Field::new("is_set", DataType::Boolean, true),
Field::new("last_value[last_value_is_set]", DataType::Boolean, true),
]
.into(),
vec![

View File

@@ -453,8 +453,8 @@ impl Accumulator for CountHashAccumulator {
);
};
let hash_array = inner_array.as_any().downcast_ref::<UInt64Array>().unwrap();
for i in 0..hash_array.len() {
self.values.insert(hash_array.value(i));
for &hash in hash_array.values().iter().take(hash_array.len()) {
self.values.insert(hash);
}
}
Ok(())

View File

@@ -152,9 +152,9 @@ impl DfAccumulator for JsonEncodePathAccumulator {
let lng_array = lng_array.as_primitive::<Float64Type>();
let mut coords = Vec::with_capacity(len);
for i in 0..len {
let lng = lng_array.value(i);
let lat = lat_array.value(i);
let lng_values = lng_array.values();
let lat_values = lat_array.values();
for (&lng, &lat) in lng_values.iter().zip(lat_values.iter()).take(len) {
coords.push(vec![lng, lat]);
}

View File

@@ -122,7 +122,8 @@ mod tests {
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 15),
"{:?}",
f.signature()

View File

@@ -193,7 +193,8 @@ mod tests {
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 6));
}

View File

@@ -120,7 +120,8 @@ mod tests {
matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if sigs.len() == 15),
"{:?}",
f.signature()

View File

@@ -25,7 +25,6 @@ use datafusion_common::arrow::array::{
};
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::type_coercion::aggregates::STRINGS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datatypes::arrow_array::{int_array_value_at_index, string_array_value_at_index};
use datatypes::json::JsonStructureSettings;
@@ -519,7 +518,7 @@ impl Default for JsonGetObject {
DataType::LargeBinary,
DataType::BinaryView,
],
STRINGS.to_vec(),
vec![DataType::UInt8, DataType::LargeUtf8, DataType::Utf8View],
),
}
}

View File

@@ -99,7 +99,8 @@ mod tests {
assert!(matches!(rate.signature(),
Signature {
type_signature: TypeSignature::Uniform(2, valid_types),
volatility: Volatility::Immutable
volatility: Volatility::Immutable,
..
} if valid_types == NUMERICS
));
let values = vec![1.0, 3.0, 6.0];

View File

@@ -208,9 +208,9 @@ fn decode_dictionary(
let mut rows = Vec::with_capacity(number_rows);
let keys = dict.keys();
for i in 0..number_rows {
let dict_index = keys.value(i) as usize;
rows.push(decoded_values[dict_index].clone());
let dict_indices = keys.values();
for &dict_index in dict_indices[..number_rows].iter() {
rows.push(decoded_values[dict_index as usize].clone());
}
Ok(rows)

View File

@@ -19,8 +19,10 @@ use datafusion_common::DataFusionError;
use datafusion_common::arrow::array::{Array, AsArray, StringViewBuilder};
use datafusion_common::arrow::compute;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_expr::type_coercion::aggregates::BINARYS;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::logical_binary;
use datafusion_expr::{
Coercion, ColumnarValue, ScalarFunctionArgs, Signature, TypeSignatureClass, Volatility,
};
use datatypes::types::vector_type_value_to_string;
use crate::function::{Function, extract_args};
@@ -35,11 +37,10 @@ pub struct VectorToStringFunction {
impl Default for VectorToStringFunction {
fn default() -> Self {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Uniform(1, BINARYS.to_vec()),
],
signature: Signature::coercible(
vec![Coercion::new_exact(TypeSignatureClass::Native(
logical_binary(),
))],
Volatility::Immutable,
),
}

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignature, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -36,9 +36,12 @@ impl Default for ElemAvgFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,10 +15,10 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignature, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, Volatility};
use nalgebra::DVectorView;
use crate::function::Function;
@@ -49,9 +49,12 @@ impl Default for ElemProductFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_expr::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
@@ -36,9 +36,12 @@ impl Default for ElemSumFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use crate::function::Function;
@@ -49,8 +49,12 @@ impl Default for VectorDimFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -15,9 +15,9 @@
use std::fmt::Display;
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr_common::type_coercion::aggregates::{BINARYS, STRINGS};
use datafusion::logical_expr::{Coercion, ColumnarValue, TypeSignatureClass};
use datafusion_common::ScalarValue;
use datafusion_common::types::{logical_binary, logical_string};
use datafusion_expr::{ScalarFunctionArgs, Signature, TypeSignature, Volatility};
use nalgebra::DVectorView;
@@ -52,9 +52,12 @@ impl Default for VectorNormFunction {
Self {
signature: Signature::one_of(
vec![
TypeSignature::Uniform(1, STRINGS.to_vec()),
TypeSignature::Uniform(1, BINARYS.to_vec()),
TypeSignature::Uniform(1, vec![DataType::BinaryView]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_binary()),
)]),
TypeSignature::Coercible(vec![Coercion::new_exact(
TypeSignatureClass::Native(logical_string()),
)]),
],
Volatility::Immutable,
),

View File

@@ -106,7 +106,8 @@ mod tests {
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::Uniform(1, valid_types),
volatility: datafusion_expr::Volatility::Immutable
volatility: datafusion_expr::Volatility::Immutable,
..
} if valid_types == &vec![ArrowDataType::Utf8]));
}

View File

@@ -103,10 +103,11 @@ impl FlightEncoder {
FlightMessage::RecordBatch(record_batch) => {
let (encoded_dictionaries, encoded_batch) = self
.data_gen
.encoded_batch(
.encode(
&record_batch,
&mut self.dictionary_tracker,
&self.write_options,
&mut Default::default(),
)
.expect("DictionaryTracker configured above to not fail on replacement");

View File

@@ -7,7 +7,6 @@ license.workspace = true
[dependencies]
common-error.workspace = true
common-macro.workspace = true
prost.workspace = true
snafu.workspace = true
tokio.workspace = true

View File

@@ -21,12 +21,12 @@ greptime-proto.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
opentelemetry = { version = "0.30.0", default-features = false, features = [
opentelemetry = { version = "0.31.0", default-features = false, features = [
"trace",
] }
opentelemetry-otlp = { version = "0.30.0", features = ["trace", "grpc-tonic", "http-proto"] }
opentelemetry-semantic-conventions = { version = "0.30.0", features = ["semconv_experimental"] }
opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio", "trace"] }
opentelemetry-otlp = { version = "0.31.0", features = ["trace", "grpc-tonic", "http-proto"] }
opentelemetry-semantic-conventions = { version = "0.31.0", features = ["semconv_experimental"] }
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio", "trace"] }
parking_lot.workspace = true
prometheus.workspace = true
serde.workspace = true

View File

@@ -73,7 +73,7 @@ impl TracingContext {
/// Attach the given span as a child of the context. Returns the attached span.
pub fn attach(&self, span: tracing::Span) -> tracing::Span {
span.set_parent(self.0.clone());
let _ = span.set_parent(self.0.clone());
span
}

View File

@@ -1145,10 +1145,11 @@ impl TryFrom<ScalarValue> for Value {
ScalarValue::List(array) => {
// this is for item type
let datatype = ConcreteDataType::try_from(&array.value_type())?;
let items = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?
let scalar_values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?;
let items = scalar_values
.into_iter()
.flatten()
.flat_map(|v| v.unwrap_or_else(|| vec![ScalarValue::Null]))
.map(|x| x.try_into())
.collect::<Result<Vec<Value>>>()?;
Value::List(ListValue::new(items, Arc::new(datatype)))
@@ -2997,6 +2998,7 @@ pub(crate) mod tests {
.unwrap()
.into_iter()
.flatten()
.flatten()
.collect::<Vec<_>>();
assert_eq!(
vs,

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::any::Any;
use std::borrow::Borrow;
use std::sync::Arc;
use arrow::array::{Array, ArrayBuilder, ArrayIter, ArrayRef, BooleanArray, BooleanBuilder};
@@ -69,8 +68,8 @@ impl From<Vec<Option<bool>>> for BooleanVector {
}
}
impl<Ptr: Borrow<Option<bool>>> FromIterator<Ptr> for BooleanVector {
fn from_iter<I: IntoIterator<Item = Ptr>>(iter: I) -> Self {
impl FromIterator<Option<bool>> for BooleanVector {
fn from_iter<T: IntoIterator<Item = Option<bool>>>(iter: T) -> Self {
BooleanVector {
array: BooleanArray::from_iter(iter),
}
@@ -303,7 +302,7 @@ mod tests {
#[test]
fn test_boolean_vector_from_iter() {
let input = vec![Some(false), Some(true), Some(false), Some(true)];
let vec = input.iter().collect::<BooleanVector>();
let vec = input.iter().cloned().collect::<BooleanVector>();
assert_eq!(4, vec.len());
for (i, v) in input.into_iter().enumerate() {
assert_eq!(v, vec.get_data(i), "Failed at {i}")

View File

@@ -83,8 +83,6 @@ impl Decimal128Vector {
/// For example:
/// value = 12345, precision = 3, return error.
pub fn with_precision_and_scale(self, precision: u8, scale: i8) -> Result<Self> {
// validate if precision is too small
self.validate_decimal_precision(precision)?;
let array = self
.array
.with_precision_and_scale(precision, scale)
@@ -124,7 +122,7 @@ impl Decimal128Vector {
}
/// Validate decimal precision, if precision is invalid, return error.
fn validate_decimal_precision(&self, precision: u8) -> Result<()> {
pub fn validate_decimal_precision(&self, precision: u8) -> Result<()> {
self.array
.validate_decimal_precision(precision)
.context(ValueExceedsPrecisionSnafu { precision })
@@ -564,7 +562,9 @@ pub mod tests {
let decimal_vector = decimal_builder.finish();
assert_eq!(decimal_vector.precision(), 38);
assert_eq!(decimal_vector.scale(), 10);
let result = decimal_vector.with_precision_and_scale(3, 2);
let result = decimal_vector
.with_precision_and_scale(3, 2)
.and_then(|x| x.validate_decimal_precision(3));
assert_eq!(
"Value exceeds the precision 3 bound",
result.unwrap_err().to_string()

View File

@@ -170,11 +170,12 @@ impl<K: ArrowDictionaryKeyType> Serializable for DictionaryVector<K> {
// the value it refers to in the dictionary
let mut result = Vec::with_capacity(self.len());
for i in 0..self.len() {
let keys = self.array.keys();
let key_values = &keys.values()[..self.len()];
for (i, &key) in key_values.iter().enumerate() {
if self.is_null(i) {
result.push(JsonValue::Null);
} else {
let key = self.array.keys().value(i);
let value = self.item_vector.get(key.as_usize());
let json_value = serde_json::to_value(value).context(error::SerializeSnafu)?;
result.push(json_value);
@@ -247,16 +248,9 @@ impl<K: ArrowDictionaryKeyType> VectorOp for DictionaryVector<K> {
let mut replicated_keys = PrimitiveBuilder::new();
let mut previous_offset = 0;
for (i, &offset) in offsets.iter().enumerate() {
let key = if i < self.len() {
if keys.is_valid(i) {
Some(keys.value(i))
} else {
None
}
} else {
None
};
let mut key_iter = keys.iter().chain(std::iter::repeat(None));
for &offset in offsets {
let key = key_iter.next().unwrap();
// repeat this key (offset - previous_offset) times
let repeat_count = offset - previous_offset;

View File

@@ -170,10 +170,11 @@ impl Helper {
ScalarValue::List(array) => {
let item_type = Arc::new(ConcreteDataType::try_from(&array.value_type())?);
let mut builder = ListVectorBuilder::with_type_capacity(item_type.clone(), 1);
let values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?
let scalar_values = ScalarValue::convert_array_to_scalar_vec(array.as_ref())
.context(ConvertArrowArrayToScalarsSnafu)?;
let values = scalar_values
.into_iter()
.flatten()
.flat_map(|v| v.unwrap_or_else(|| vec![ScalarValue::Null]))
.map(ScalarValue::try_into)
.collect::<Result<Vec<Value>>>()?;
builder.push(Some(ListValueRef::Ref {

View File

@@ -18,6 +18,7 @@ use common_datasource::file_format::Format;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::parquet::DefaultParquetFileReaderFactory;
use datafusion::common::ToDFSchema;
use datafusion::config::CsvOptions;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
@@ -34,7 +35,6 @@ use datafusion::prelude::SessionContext;
use datafusion_expr::expr::Expr;
use datafusion_expr::utils::conjunction;
use datafusion_orc::OrcSource;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;
@@ -45,7 +45,6 @@ const DEFAULT_BATCH_SIZE: usize = 8192;
fn build_record_batch_stream(
scan_plan_config: &ScanPlanConfig,
file_schema: Arc<ArrowSchema>,
limit: Option<usize>,
file_source: Arc<dyn FileSource>,
) -> Result<DfSendableRecordBatchStream> {
@@ -55,15 +54,12 @@ fn build_record_batch_stream(
.map(|filename| PartitionedFile::new(filename.clone(), 0))
.collect::<Vec<_>>();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_projection(scan_plan_config.projection.cloned())
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source.clone())
.with_projection_indices(scan_plan_config.projection.cloned())
.with_limit(limit)
.with_file_group(FileGroup::new(files))
.build();
let store = Arc::new(object_store_opendal::OpendalStore::new(
scan_plan_config.store.clone(),
@@ -89,11 +85,14 @@ fn new_csv_stream(
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
.with_schema(file_schema.clone())
let options = CsvOptions::default()
.with_has_header(format.has_header)
.with_delimiter(format.delimiter);
let csv_source = CsvSource::new(file_schema)
.with_csv_options(options)
.with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, csv_source)
build_record_batch_stream(config, limit, csv_source)
}
fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream> {
@@ -102,8 +101,8 @@ fn new_json_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStrea
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let file_source = JsonSource::new().with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, file_source)
let file_source = JsonSource::new(file_schema).with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, file_source)
}
fn new_parquet_stream_with_exec_plan(
@@ -126,9 +125,10 @@ fn new_parquet_stream_with_exec_plan(
.collect::<Vec<_>>(),
);
let mut parquet_source = ParquetSource::default().with_parquet_file_reader_factory(Arc::new(
DefaultParquetFileReaderFactory::new(store.clone()),
));
let mut parquet_source = ParquetSource::new(file_schema.clone())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(
store.clone(),
)));
// build predicate filter
let filters = filters.to_vec();
@@ -143,15 +143,12 @@ fn new_parquet_stream_with_exec_plan(
parquet_source = parquet_source.with_predicate(filters);
};
let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
Arc::new(parquet_source),
)
.with_file_group(file_group)
.with_projection(projection.cloned())
.with_limit(*limit)
.build();
let file_scan_config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), Arc::new(parquet_source))
.with_file_group(file_group)
.with_projection_indices(projection.cloned())
.with_limit(*limit)
.build();
// TODO(ruihang): get this from upper layer
let task_ctx = SessionContext::default().task_ctx();
@@ -170,8 +167,8 @@ fn new_orc_stream(config: &ScanPlanConfig) -> Result<DfSendableRecordBatchStream
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
let file_source = OrcSource::default().with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, file_schema, limit, file_source)
let file_source = OrcSource::new(file_schema.into()).with_batch_size(DEFAULT_BATCH_SIZE);
build_record_batch_stream(config, limit, file_source)
}
#[derive(Debug, Clone)]

View File

@@ -14,7 +14,7 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::time::Instant;
use common_meta::datanode::RegionManifestInfo;
use common_meta::peer::Peer;
@@ -22,9 +22,7 @@ use common_telemetry::init_default_ut_logging;
use store_api::region_engine::RegionRole;
use store_api::storage::{FileId, FileRefsManifest, GcReport, RegionId};
use crate::gc::mock::{
MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat, new_empty_report_with,
};
use crate::gc::mock::{MockSchedulerCtx, TEST_REGION_SIZE_200MB, mock_region_stat};
use crate::gc::{GcScheduler, GcSchedulerOptions};
// Integration Flow Tests
@@ -135,6 +133,10 @@ async fn test_full_gc_workflow() {
#[cfg(target_os = "linux")]
#[tokio::test]
async fn test_tracker_cleanup() {
use std::time::Duration;
use crate::gc::mock::new_empty_report_with;
init_default_ut_logging();
let table_id = 1;

View File

@@ -35,7 +35,7 @@ use index::result_cache::IndexResultCache;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use object_store::ObjectStore;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
@@ -85,13 +85,13 @@ impl CacheStrategy {
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_parquet_meta_data(file_id, metrics).await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_parquet_meta_data(file_id, metrics).await
CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data(file_id, metrics, page_index_policy)
.await
}
CacheStrategy::Disabled => {
metrics.cache_miss += 1;
@@ -340,6 +340,7 @@ impl CacheManager {
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
@@ -352,7 +353,7 @@ impl CacheManager {
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache
.file_cache()
.get_parquet_meta_data(key, metrics)
.get_parquet_meta_data(key, metrics, page_index_policy)
.await
{
metrics.file_cache_hit += 1;
@@ -893,7 +894,7 @@ mod tests {
cache.put_parquet_meta_data(file_id, metadata);
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_none()
);
@@ -923,7 +924,7 @@ mod tests {
let file_id = RegionFileId::new(region_id, FileId::random());
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_none()
);
@@ -931,14 +932,14 @@ mod tests {
cache.put_parquet_meta_data(file_id, metadata);
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_some()
);
cache.remove_parquet_meta_data(file_id);
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.get_parquet_meta_data(file_id, &mut metrics, Default::default())
.await
.is_none()
);

View File

@@ -16,11 +16,13 @@
use std::mem;
use parquet::basic::ColumnOrder;
use parquet::file::metadata::{
FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, RowGroupMetaData,
FileMetaData, KeyValue, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex,
RowGroupMetaData,
};
use parquet::file::page_index::index::Index;
use parquet::format::{ColumnOrder, KeyValue, PageLocation};
use parquet::file::page_index::column_index::ColumnIndexMetaData as Index;
use parquet::file::page_index::offset_index::PageLocation;
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
/// Returns estimated size of [ParquetMetaData].

View File

@@ -28,7 +28,7 @@ use moka::notification::RemovalCause;
use moka::policy::EvictionPolicy;
use object_store::util::join_path;
use object_store::{ErrorKind, ObjectStore, Reader};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
use snafu::ResultExt;
use store_api::storage::{FileId, RegionId};
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
@@ -571,6 +571,7 @@ impl FileCache {
&self,
key: IndexKey,
cache_metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Option<ParquetMetaData> {
// Check if file cache contains the key
if let Some(index_value) = self.inner.parquet_index.get(&key).await {
@@ -578,7 +579,8 @@ impl FileCache {
let local_store = self.local_store();
let file_path = self.inner.cache_file_path(key);
let file_size = index_value.file_size as u64;
let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
let mut metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
metadata_loader.with_page_index_policy(page_index_policy);
match metadata_loader.load(cache_metrics).await {
Ok(metadata) => {

View File

@@ -24,6 +24,7 @@ use object_store::services::Fs;
use parquet::arrow::ArrowWriter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics;
/// Returns a parquet meta data.
pub(crate) fn parquet_meta() -> Arc<ParquetMetaData> {
@@ -49,3 +50,60 @@ pub(crate) fn new_fs_store(path: &str) -> ObjectStore {
let builder = Fs::default();
ObjectStore::new(builder.root(path)).unwrap().finish()
}
pub(crate) fn assert_parquet_metadata_equal(x: Arc<ParquetMetaData>, y: Arc<ParquetMetaData>) {
// Normalize the statistics in parquet metadata because the flag "min_max_backwards_compatible"
// is not persisted across parquet metadata writer and reader.
fn normalize_statistics(metadata: ParquetMetaData) -> ParquetMetaData {
let unset_min_max_backwards_compatible_flag = |stats: Statistics| -> Statistics {
match stats {
Statistics::Boolean(stats) => {
Statistics::Boolean(stats.with_backwards_compatible_min_max(false))
}
Statistics::Int32(stats) => {
Statistics::Int32(stats.with_backwards_compatible_min_max(false))
}
Statistics::Int64(stats) => {
Statistics::Int64(stats.with_backwards_compatible_min_max(false))
}
Statistics::Int96(stats) => {
Statistics::Int96(stats.with_backwards_compatible_min_max(false))
}
Statistics::Float(stats) => {
Statistics::Float(stats.with_backwards_compatible_min_max(false))
}
Statistics::Double(stats) => {
Statistics::Double(stats.with_backwards_compatible_min_max(false))
}
Statistics::ByteArray(stats) => {
Statistics::ByteArray(stats.with_backwards_compatible_min_max(false))
}
Statistics::FixedLenByteArray(stats) => {
Statistics::FixedLenByteArray(stats.with_backwards_compatible_min_max(false))
}
}
};
let mut metadata_builder = metadata.into_builder();
for rg in metadata_builder.take_row_groups() {
let mut rg_builder = rg.into_builder();
for col in rg_builder.take_columns() {
let stats = col
.statistics()
.cloned()
.map(unset_min_max_backwards_compatible_flag);
let mut col_builder = col.into_builder().clear_statistics();
if let Some(stats) = stats {
col_builder = col_builder.set_statistics(stats);
}
rg_builder = rg_builder.add_column_metadata(col_builder.build().unwrap());
}
metadata_builder = metadata_builder.add_row_group(rg_builder.build().unwrap());
}
metadata_builder.build()
}
let x = normalize_statistics(Arc::unwrap_or_clone(x));
let y = normalize_statistics(Arc::unwrap_or_clone(y));
assert_eq!(x, y);
}

View File

@@ -470,11 +470,12 @@ impl UploadTracker {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::ATOMIC_WRITE_DIR;
use parquet::file::metadata::PageIndexPolicy;
use store_api::region_request::PathType;
use super::*;
use crate::access_layer::OperationType;
use crate::cache::test_util::new_fs_store;
use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store};
use crate::cache::{CacheManager, CacheStrategy};
use crate::error::InvalidBatchSnafu;
use crate::read::Source;
@@ -482,8 +483,7 @@ mod tests {
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util::TestEnv;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle_with_file_id,
sst_region_metadata,
new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata,
};
#[tokio::test]
@@ -652,11 +652,12 @@ mod tests {
handle.clone(),
mock_store.clone(),
)
.cache(CacheStrategy::EnableAll(cache_manager.clone()));
.cache(CacheStrategy::EnableAll(cache_manager.clone()))
.page_index_policy(PageIndexPolicy::Optional);
let reader = builder.build().await.unwrap();
// Check parquet metadata
assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
assert_parquet_metadata_equal(write_parquet_metadata, reader.parquet_metadata());
}
#[tokio::test]

View File

@@ -601,14 +601,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid file metadata"))]
ConvertMetaData {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: parquet::errors::ParquetError,
},
#[snafu(display("Column not found, column: {column}"))]
ColumnNotFound {
column: String,
@@ -1284,7 +1276,6 @@ impl ErrorExt for Error {
| Join { .. }
| WorkerStopped { .. }
| Recv { .. }
| ConvertMetaData { .. }
| DecodeWal { .. }
| ComputeArrow { .. }
| BiErrors { .. }

View File

@@ -71,7 +71,6 @@ use crate::sst::index::IndexOutput;
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
use crate::sst::{SeriesEstimator, to_sst_arrow_schema};
@@ -1197,7 +1196,7 @@ impl BulkPartEncoder {
metrics.num_rows += total_rows;
let buf = Bytes::from(buf);
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
let parquet_metadata = Arc::new(file_metadata);
let num_series = series_estimator.finish();
Ok(Some(EncodedBulkPart {
@@ -1232,7 +1231,7 @@ impl BulkPartEncoder {
};
let buf = Bytes::from(buf);
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
let parquet_metadata = Arc::new(file_metadata);
Ok(Some(EncodedBulkPart {
data: buf,

View File

@@ -1174,9 +1174,8 @@ pub(crate) fn decode_primary_keys_with_counts(
let mut result: Vec<(CompositeValues, usize)> = Vec::new();
let mut prev_key: Option<u32> = None;
for i in 0..keys.len() {
let current_key = keys.value(i);
let pk_indices = keys.values();
for &current_key in pk_indices.iter().take(keys.len()) {
// Checks if current key is the same as previous key
if let Some(prev) = prev_key
&& prev == current_key

View File

@@ -115,7 +115,7 @@ mod tests {
use object_store::ObjectStore;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::metadata::{KeyValue, PageIndexPolicy};
use parquet::file::properties::WriterProperties;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
@@ -126,6 +126,7 @@ mod tests {
use super::*;
use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
use crate::cache::test_util::assert_parquet_metadata_equal;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::config::IndexConfig;
use crate::read::{BatchBuilder, BatchReader, FlatSource};
@@ -143,9 +144,9 @@ mod tests {
DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema,
};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary,
new_batch_with_custom_sequence, new_primary_key, new_source, new_sparse_primary_key,
sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
sst_region_metadata_with_encoding,
};
use crate::test_util::{TestEnv, check_reader_result};
@@ -377,11 +378,12 @@ mod tests {
PathType::Bare,
handle.clone(),
object_store,
);
)
.page_index_policy(PageIndexPolicy::Optional);
let reader = builder.build().await.unwrap();
let reader_metadata = reader.parquet_metadata();
assert_parquet_metadata_eq(writer_metadata, reader_metadata)
assert_parquet_metadata_equal(writer_metadata, reader_metadata);
}
#[tokio::test]

View File

@@ -563,9 +563,8 @@ pub(crate) fn decode_primary_keys(
// The parquet reader may read the whole dictionary page into the dictionary values, so
// we may decode many primary keys not in this batch if we decode the values array directly.
for i in 0..keys.len() {
let current_key = keys.value(i);
let pk_indices = keys.values();
for &current_key in pk_indices.iter().take(keys.len()) {
// Check if current key is the same as previous key
if let Some(prev) = prev_key
&& prev == current_key

View File

@@ -13,82 +13,11 @@
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use common_telemetry::trace;
use object_store::ObjectStore;
use parquet::basic::ColumnOrder;
use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::format;
use parquet::schema::types::{SchemaDescriptor, from_thrift};
use snafu::ResultExt;
use crate::error;
use crate::error::Result;
// Refer to https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L74-L90
/// Convert [format::FileMetaData] to [ParquetMetaData]
pub fn parse_parquet_metadata(t_file_metadata: format::FileMetaData) -> Result<ParquetMetaData> {
let schema = from_thrift(&t_file_metadata.schema).context(error::ConvertMetaDataSnafu)?;
let schema_desc_ptr = Arc::new(SchemaDescriptor::new(schema));
let mut row_groups = Vec::with_capacity(t_file_metadata.row_groups.len());
for rg in t_file_metadata.row_groups {
row_groups.push(
RowGroupMetaData::from_thrift(schema_desc_ptr.clone(), rg)
.context(error::ConvertMetaDataSnafu)?,
);
}
let column_orders = parse_column_orders(t_file_metadata.column_orders, &schema_desc_ptr);
let file_metadata = FileMetaData::new(
t_file_metadata.version,
t_file_metadata.num_rows,
t_file_metadata.created_by,
t_file_metadata.key_value_metadata,
schema_desc_ptr,
column_orders,
);
// There may be a problem owing to lacking of column_index and offset_index,
// if we open page index in the future.
Ok(ParquetMetaData::new(file_metadata, row_groups))
}
// Port from https://github.com/apache/arrow-rs/blob/7e134f4d277c0b62c27529fc15a4739de3ad0afd/parquet/src/file/footer.rs#L106-L137
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
t_column_orders: Option<Vec<format::ColumnOrder>>,
schema_descr: &SchemaDescriptor,
) -> Option<Vec<ColumnOrder>> {
match t_column_orders {
Some(orders) => {
// Should always be the case
assert_eq!(
orders.len(),
schema_descr.num_columns(),
"Column order length mismatch"
);
let mut res = Vec::with_capacity(schema_descr.num_columns());
for (i, column) in schema_descr.columns().iter().enumerate() {
match orders[i] {
format::ColumnOrder::TYPEORDER(_) => {
let sort_order = ColumnOrder::get_sort_order(
column.logical_type(),
column.converted_type(),
column.physical_type(),
);
res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order));
}
}
}
Some(res)
}
None => None,
}
}
const FETCH_PARALLELISM: usize = 8;
pub(crate) const MERGE_GAP: usize = 512 * 1024;

View File

@@ -21,7 +21,7 @@ use futures::future::BoxFuture;
use object_store::ObjectStore;
use parquet::arrow::async_reader::MetadataFetch;
use parquet::errors::{ParquetError, Result as ParquetResult};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
use snafu::{IntoError as _, ResultExt};
use crate::error::{self, Result};
@@ -37,6 +37,7 @@ pub(crate) struct MetadataLoader<'a> {
file_path: &'a str,
// The size of parquet file
file_size: u64,
page_index_policy: PageIndexPolicy,
}
impl<'a> MetadataLoader<'a> {
@@ -50,9 +51,14 @@ impl<'a> MetadataLoader<'a> {
object_store,
file_path,
file_size,
page_index_policy: Default::default(),
}
}
pub(crate) fn with_page_index_policy(&mut self, page_index_policy: PageIndexPolicy) {
self.page_index_policy = page_index_policy;
}
/// Get the size of parquet file. If file_size is 0, stat the object store to get the size.
async fn get_file_size(&self) -> Result<u64> {
let file_size = match self.file_size {
@@ -70,8 +76,9 @@ impl<'a> MetadataLoader<'a> {
pub async fn load(&self, cache_metrics: &mut MetadataCacheMetrics) -> Result<ParquetMetaData> {
let path = self.file_path;
let file_size = self.get_file_size().await?;
let reader =
ParquetMetaDataReader::new().with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize));
let reader = ParquetMetaDataReader::new()
.with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize))
.with_page_index_policy(self.page_index_policy);
let num_reads = AtomicUsize::new(0);
let bytes_read = AtomicU64::new(0);

View File

@@ -33,8 +33,7 @@ use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
use parquet::file::metadata::{KeyValue, PageIndexPolicy, ParquetMetaData};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::region_request::PathType;
@@ -142,6 +141,7 @@ pub struct ParquetReaderBuilder {
pre_filter_mode: PreFilterMode,
/// Whether to decode primary key values eagerly when reading primary key format SSTs.
decode_primary_key_values: bool,
page_index_policy: PageIndexPolicy,
}
impl ParquetReaderBuilder {
@@ -172,6 +172,7 @@ impl ParquetReaderBuilder {
compaction: false,
pre_filter_mode: PreFilterMode::All,
decode_primary_key_values: false,
page_index_policy: Default::default(),
}
}
@@ -276,6 +277,12 @@ impl ParquetReaderBuilder {
self
}
#[must_use]
pub fn page_index_policy(mut self, page_index_policy: PageIndexPolicy) -> Self {
self.page_index_policy = page_index_policy;
self
}
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -314,7 +321,12 @@ impl ParquetReaderBuilder {
// Loads parquet metadata of the file.
let (parquet_meta, cache_miss) = self
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
.read_parquet_metadata(
&file_path,
file_size,
&mut metrics.metadata_cache_metrics,
self.page_index_policy,
)
.await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
@@ -479,6 +491,7 @@ impl ParquetReaderBuilder {
file_path: &str,
file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
page_index_policy: PageIndexPolicy,
) -> Result<(Arc<ParquetMetaData>, bool)> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED
@@ -489,7 +502,7 @@ impl ParquetReaderBuilder {
// Tries to get from cache with metrics tracking.
if let Some(metadata) = self
.cache_strategy
.get_parquet_meta_data(file_id, cache_metrics)
.get_parquet_meta_data(file_id, cache_metrics, page_index_policy)
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
@@ -497,7 +510,9 @@ impl ParquetReaderBuilder {
}
// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let mut metadata_loader =
MetadataLoader::new(self.object_store.clone(), file_path, file_size);
metadata_loader.with_page_index_policy(page_index_policy);
let metadata = metadata_loader.load(cache_metrics).await?;
let metadata = Arc::new(metadata);

View File

@@ -55,7 +55,6 @@ use crate::sst::file::RegionFileId;
use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
use crate::sst::{
DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
@@ -205,14 +204,12 @@ where
}
current_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
let time_range = stats.time_range.unwrap();
// convert FileMetaData to ParquetMetaData
let parquet_metadata = parse_parquet_metadata(file_meta)?;
let max_row_group_uncompressed_size: u64 = parquet_metadata
.row_groups()
.iter()

View File

@@ -23,7 +23,6 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
@@ -277,30 +276,6 @@ pub fn new_batch_with_binary(tags: &[&str], start: usize, end: usize) -> Batch {
builder.build().unwrap()
}
/// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually
pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaData>) {
macro_rules! assert_metadata {
( $a:expr, $b:expr, $($method:ident,)+ ) => {
$(
assert_eq!($a.$method(), $b.$method());
)+
}
}
assert_metadata!(
a.file_metadata(),
b.file_metadata(),
version,
num_rows,
created_by,
key_value_metadata,
schema_descr,
column_orders,
);
assert_metadata!(a, b, row_groups, column_index, offset_index,);
}
/// Creates a new region metadata for testing SSTs with binary datatype.
///
/// Schema: tag_0(string), field_0(binary), ts

View File

@@ -21,6 +21,7 @@ use std::num::NonZeroU64;
use std::sync::Arc;
use common_telemetry::{info, warn};
use parquet::file::metadata::PageIndexPolicy;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
@@ -523,7 +524,11 @@ async fn edit_region(
let mut cache_metrics = Default::default();
let _ = write_cache
.file_cache()
.get_parquet_meta_data(index_key, &mut cache_metrics)
.get_parquet_meta_data(
index_key,
&mut cache_metrics,
PageIndexPolicy::Optional,
)
.await;
listener.on_file_cache_filled(index_key.file_id);

View File

@@ -33,6 +33,7 @@ use common_telemetry::{debug, tracing};
use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
use datafusion_common::config::CsvOptions;
use datafusion_expr::Expr;
use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
@@ -214,13 +215,15 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);
let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
.with_schema(schema.clone())
let options = CsvOptions::default()
.with_has_header(format.has_header)
.with_delimiter(format.delimiter);
let csv_source = CsvSource::new(schema.clone())
.with_csv_options(options)
.with_batch_size(DEFAULT_BATCH_SIZE);
let stream = file_to_stream(
object_store,
path,
schema.clone(),
csv_source,
Some(projection),
format.compression_type,
@@ -247,13 +250,11 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(DEFAULT_BATCH_SIZE);
let json_source =
JsonSource::new(schema.clone()).with_batch_size(DEFAULT_BATCH_SIZE);
let stream = file_to_stream(
object_store,
path,
schema.clone(),
json_source,
Some(projection),
format.compression_type,

View File

@@ -34,7 +34,7 @@ use datafusion::physical_plan::{
RecordBatchStream, SendableRecordBatchStream,
};
use datafusion_common::DFSchema;
use datafusion_expr::EmptyRelation;
use datafusion_expr::{EmptyRelation, col};
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit};
@@ -107,7 +107,21 @@ impl UserDefinedLogicalNodeCore for Absent {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
vec![col(&self.time_index_column)]
}
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index_column)?;
Some(vec![vec![time_index_idx]])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {

View File

@@ -40,6 +40,7 @@ use datafusion::physical_plan::{
Partitioning, PhysicalExpr, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::{Column, Expr};
use datafusion_expr::col;
use datatypes::prelude::{ConcreteDataType, DataType as GtDataType};
use datatypes::value::{OrderedF64, Value, ValueRef};
use datatypes::vectors::{Helper, MutableVector, VectorRef};
@@ -88,7 +89,45 @@ impl UserDefinedLogicalNodeCore for HistogramFold {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
let mut exprs = vec![
col(&self.le_column),
col(&self.ts_column),
col(&self.field_column),
];
exprs.extend(self.input.schema().fields().iter().filter_map(|f| {
let name = f.name();
if name != &self.le_column && name != &self.ts_column && name != &self.field_column {
Some(col(name))
} else {
None
}
}));
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
let input_schema = self.input.schema();
let le_column_index = input_schema.index_of_column_by_name(None, &self.le_column)?;
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut necessary_indices = output_columns
.iter()
.map(|&output_column| {
if output_column < le_column_index {
output_column
} else {
output_column + 1
}
})
.collect::<Vec<_>>();
necessary_indices.push(le_column_index);
necessary_indices.sort_unstable();
necessary_indices.dedup();
Some(vec![necessary_indices])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -998,11 +1037,26 @@ mod test {
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::EmptyRelation;
use datafusion::prelude::SessionContext;
use datatypes::arrow_array::StringArray;
use futures::FutureExt;
use super::*;
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
fn prepare_test_data() -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Utf8, true),
@@ -1190,6 +1244,100 @@ mod test {
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn pruning_should_keep_le_column_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("le", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
]));
let df_schema = schema.clone().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = HistogramFold::new(
"le".to_string(),
"val".to_string(),
"ts".to_string(),
0.5,
input,
)
.unwrap();
let output_columns = [0usize, 1usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
let input_batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![0, 0])),
Arc::new(StringArray::from(vec!["0.1", "+Inf"])),
Arc::new(Float64Array::from(vec![1.0, 2.0])),
],
)
.unwrap();
let projected = project_batch(&input_batch, required);
let projected_schema = projected.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(),
)));
let fold_exec = plan.to_execution_plan(memory_exec);
let session_context = SessionContext::default();
let output_batches =
datafusion::physical_plan::collect(fold_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(output_batches.len(), 1);
let output_batch = &output_batches[0];
assert_eq!(output_batch.num_rows(), 1);
let ts = output_batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(ts.values(), &[0i64]);
let values = output_batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert!((values.value(0) - 0.1).abs() < 1e-12);
// Simulate the pre-fix pruning behavior: omit the `le` column from the child input.
let le_index = 1usize;
let broken_required = output_columns
.iter()
.map(|&output_column| {
if output_column < le_index {
output_column
} else {
output_column + 1
}
})
.collect::<Vec<_>>();
let broken = project_batch(&input_batch, &broken_required);
let broken_schema = broken.schema();
let broken_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(),
)));
let broken_fold_exec = plan.to_execution_plan(broken_exec);
let session_context = SessionContext::default();
let broken_result = std::panic::AssertUnwindSafe(async {
datafusion::physical_plan::collect(broken_fold_exec, session_context.task_ctx()).await
})
.catch_unwind()
.await;
assert!(broken_result.is_err());
}
#[test]
fn confirm_schema() {
let input_schema = Schema::new(vec![

View File

@@ -33,6 +33,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
use futures::{Stream, StreamExt, ready};
@@ -84,7 +85,37 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
let mut exprs = vec![col(&self.time_index_column)];
if let Some(field) = &self.field_column {
exprs.push(col(field));
}
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = output_columns.to_vec();
required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
if let Some(field) = &self.field_column {
required.push(input_schema.index_of_column_by_name(None, field)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -440,8 +471,6 @@ impl InstantManipulateStream {
// refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571
// and the function `vectorSelectorSingle`
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut take_indices = vec![];
let ts_column = input
.column(self.time_index)
.as_any()
@@ -473,6 +502,8 @@ impl InstantManipulateStream {
let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
let mut take_indices = vec![];
let mut cursor = 0;
let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
@@ -570,6 +601,8 @@ impl InstantManipulateStream {
#[cfg(test)]
mod test {
use datafusion::common::ToDFSchema;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use super::*;
@@ -611,6 +644,30 @@ mod test {
assert_eq!(result_literal, expected);
}
#[test]
fn pruning_should_keep_time_and_field_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = InstantManipulate::new(
0,
0,
0,
0,
TIME_INDEX_COLUMN.to_string(),
Some("value".to_string()),
input,
);
// Simulate a parent projection requesting only the `path` column.
let output_columns = [2usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[tokio::test]
async fn lookback_10s_interval_30s() {
let expected = String::from(

View File

@@ -31,6 +31,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion_expr::col;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
@@ -83,7 +84,38 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
}
fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
self.tag_columns
.iter()
.map(col)
.chain(std::iter::once(col(&self.time_index_column_name)))
.collect()
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
required.extend_from_slice(output_columns);
required.push(input_schema.index_of_column_by_name(None, &self.time_index_column_name)?);
for tag in &self.tag_columns {
required.push(input_schema.index_of_column_by_name(None, tag)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -429,8 +461,10 @@ mod test {
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow_array::StringArray;
@@ -461,6 +495,23 @@ mod test {
))
}
#[test]
fn pruning_should_keep_time_and_tag_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan =
SeriesNormalize::new(0, TIME_INDEX_COLUMN, true, vec!["path".to_string()], input);
// Simulate a parent projection requesting only the `value` column.
let output_columns = [1usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[tokio::test]
async fn test_sort_record_batch() {
let memory_exec = Arc::new(prepare_test_data());

View File

@@ -18,7 +18,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use common_telemetry::debug;
use common_telemetry::{debug, warn};
use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
use datafusion::arrow::compute;
use datafusion::arrow::datatypes::{Field, SchemaRef};
@@ -38,6 +38,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream, Statistics,
};
use datafusion::sql::TableReference;
use datafusion_expr::col;
use futures::{Stream, StreamExt, ready};
use greptime_proto::substrait_extension as pb;
use prost::Message;
@@ -288,7 +289,53 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
let mut exprs = Vec::with_capacity(1 + self.field_columns.len());
exprs.push(col(&self.time_index));
exprs.extend(self.field_columns.iter().map(col));
exprs
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let input_len = input_schema.fields().len();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?;
if output_columns.is_empty() {
let indices = (0..input_len).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = Vec::with_capacity(output_columns.len() + 1 + self.field_columns.len());
required.push(time_index_idx);
for value_column in &self.field_columns {
required.push(input_schema.index_of_column_by_name(None, value_column)?);
}
for &idx in output_columns {
if idx < input_len {
required.push(idx);
} else if idx == input_len {
// Derived timestamp range column.
required.push(time_index_idx);
} else {
warn!(
"Output column index {} is out of bounds for input schema with length {}",
idx, input_len
);
return None;
}
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -734,16 +781,31 @@ mod test {
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::physical_expr::Partitioning;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
use futures::FutureExt;
use super::*;
const TIME_INDEX_COLUMN: &str = "timestamp";
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
fn prepare_test_data() -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
@@ -844,10 +906,96 @@ mod test {
assert_eq!(result_literal, expected);
}
#[tokio::test]
async fn pruning_should_keep_time_and_value_columns_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value_1", DataType::Float64, true),
Field::new("value_2", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let df_schema = schema.clone().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = RangeManipulate::new(
0,
310_000,
30_000,
90_000,
TIME_INDEX_COLUMN.to_string(),
vec!["value_1".to_string(), "value_2".to_string()],
input,
)
.unwrap();
// Simulate a parent projection requesting only the `path` column.
let output_columns = [3usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2, 3]);
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let input_batch = RecordBatch::try_new(
schema,
vec![
timestamp_column,
field_column.clone(),
field_column,
path_column,
],
)
.unwrap();
let projected = project_batch(&input_batch, required);
let projected_schema = projected.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(),
)));
let range_exec = plan.to_execution_plan(memory_exec);
let session_context = SessionContext::default();
let output_batches =
datafusion::physical_plan::collect(range_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(output_batches.len(), 1);
let output_batch = &output_batches[0];
let path = output_batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(path.iter().all(|v| v == Some("foo")));
// Simulate the pre-fix pruning behavior: omit the timestamp/value columns from the child.
let broken_required = [3usize];
let broken = project_batch(&input_batch, &broken_required);
let broken_schema = broken.schema();
let broken_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(),
)));
let broken_range_exec = plan.to_execution_plan(broken_exec);
let session_context = SessionContext::default();
let broken_result = std::panic::AssertUnwindSafe(async {
datafusion::physical_plan::collect(broken_range_exec, session_context.task_ctx()).await
})
.catch_unwind()
.await;
assert!(broken_result.is_err());
}
#[tokio::test]
async fn interval_30s_range_90s() {
let expected = String::from(
"PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
"PrimitiveArray<Timestamp(ms)>\n[\n \
1970-01-01T00:00:00,\n \
1970-01-01T00:00:30,\n \
1970-01-01T00:01:00,\n \
@@ -867,7 +1015,7 @@ mod test {
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
RangeArray { \
base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
base array: PrimitiveArray<Timestamp(ms)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}",
);
@@ -880,7 +1028,7 @@ mod test {
#[tokio::test]
async fn small_empty_range() {
let expected = String::from(
"PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
"PrimitiveArray<Timestamp(ms)>\n[\n \
1970-01-01T00:00:00.001,\n \
1970-01-01T00:00:03.001,\n \
1970-01-01T00:00:06.001,\n \
@@ -893,7 +1041,7 @@ mod test {
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
RangeArray { \
base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
base array: PrimitiveArray<Timestamp(ms)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
}",
);

View File

@@ -31,6 +31,7 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::Expr;
use datafusion::sql::TableReference;
use datafusion_expr::col;
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::compute::{CastOptions, cast_with_options, concat_batches};
use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
@@ -266,7 +267,36 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
self.tag_columns
.iter()
.map(col)
.chain(std::iter::once(col(&self.time_index)))
.chain(std::iter::once(col(&self.field_column)))
.collect()
}
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?;
let field_column_idx = input_schema.index_of_column_by_name(None, &self.field_column)?;
let mut required = Vec::with_capacity(2 + self.tag_columns.len());
required.extend([time_index_idx, field_column_idx]);
for tag in &self.tag_columns {
required.push(input_schema.index_of_column_by_name(None, tag)?);
}
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -275,15 +305,9 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate {
fn with_exprs_and_inputs(
&self,
exprs: Vec<Expr>,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> DataFusionResult<Self> {
if !exprs.is_empty() {
return Err(DataFusionError::Internal(
"ScalarCalculate should not have any expressions".to_string(),
));
}
let input: LogicalPlan = inputs.into_iter().next().unwrap();
let input_schema = input.schema();
@@ -624,6 +648,109 @@ mod test {
use super::*;
fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
let fields = indices
.iter()
.map(|&idx| batch.schema().field(idx).clone())
.collect::<Vec<_>>();
let columns = indices
.iter()
.map(|&idx| batch.column(idx).clone())
.collect::<Vec<_>>();
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, columns).unwrap()
}
#[test]
fn necessary_children_exprs_preserve_tag_columns() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
Field::new("extra", DataType::Utf8, true),
]));
let schema = Arc::new(DFSchema::try_from(schema).unwrap());
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema,
});
let tag_columns = vec!["tag1".to_string(), "tag2".to_string()];
let plan = ScalarCalculate::new(0, 1, 1, input, "ts", &tag_columns, "val", None).unwrap();
let required = plan.necessary_children_exprs(&[0, 1]).unwrap();
assert_eq!(required, vec![vec![0, 1, 2, 3]]);
}
#[tokio::test]
async fn pruning_should_keep_tag_columns_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("tag1", DataType::Utf8, true),
Field::new("tag2", DataType::Utf8, true),
Field::new("val", DataType::Float64, true),
Field::new("extra", DataType::Utf8, true),
]));
let df_schema = Arc::new(DFSchema::try_from(schema.clone()).unwrap());
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let tag_columns = vec!["tag1".to_string(), "tag2".to_string()];
let plan =
ScalarCalculate::new(0, 15_000, 5000, input, "ts", &tag_columns, "val", None).unwrap();
let required = plan.necessary_children_exprs(&[0, 1]).unwrap();
let required = &required[0];
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![
0, 5_000, 10_000, 15_000,
])),
Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo"])),
Arc::new(StringArray::from(vec!["bar", "bar", "bar", "bar"])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
Arc::new(StringArray::from(vec!["x", "x", "x", "x"])),
],
)
.unwrap();
let projected_batch = project_batch(&batch, required);
let projected_schema = projected_batch.schema();
let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![projected_batch]], projected_schema, None).unwrap(),
)));
let scalar_exec = plan.to_execution_plan(memory_exec).unwrap();
let session_context = SessionContext::default();
let result = datafusion::physical_plan::collect(scalar_exec, session_context.task_ctx())
.await
.unwrap();
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 4);
assert_eq!(batch.schema().field(0).name(), "ts");
assert_eq!(batch.schema().field(1).name(), "scalar(val)");
let ts = batch
.column(0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert_eq!(ts.values(), &[0i64, 5_000, 10_000, 15_000]);
let values = batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(values.values(), &[1.0f64, 2.0, 3.0, 4.0]);
}
fn prepare_test_data(series: Vec<RecordBatch>) -> DataSourceExec {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),

View File

@@ -33,6 +33,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use datatypes::compute::SortOptions;
use futures::{Stream, StreamExt, ready};
@@ -76,7 +77,38 @@ impl UserDefinedLogicalNodeCore for SeriesDivide {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
if self.unfix.is_some() {
return vec![];
}
self.tag_columns
.iter()
.map(col)
.chain(std::iter::once(col(&self.time_index_column)))
.collect()
}
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
if self.unfix.is_some() {
return None;
}
let input_schema = self.input.schema();
if output_columns.is_empty() {
let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
return Some(vec![indices]);
}
let mut required = Vec::with_capacity(output_columns.len() + 1 + self.tag_columns.len());
required.extend_from_slice(output_columns);
for tag in &self.tag_columns {
required.push(input_schema.index_of_column_by_name(None, tag)?);
}
required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
required.sort_unstable();
required.dedup();
Some(vec![required])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
@@ -544,8 +576,10 @@ impl SeriesDivideStream {
#[cfg(test)]
mod test {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::ToDFSchema;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use super::*;
@@ -611,6 +645,26 @@ mod test {
))
}
#[test]
fn pruning_should_keep_tags_and_time_index_columns_for_exec() {
let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
let input = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema,
});
let plan = SeriesDivide::new(
vec!["host".to_string(), "path".to_string()],
"time_index".to_string(),
input,
);
// Simulate a parent projection requesting only the `host` column.
let output_columns = [0usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
let required = &required[0];
assert_eq!(required.as_slice(), &[0, 1, 2]);
}
#[tokio::test]
async fn overall_data() {
let memory_exec = Arc::new(prepare_test_data());

View File

@@ -32,6 +32,7 @@ use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, hash_utils,
};
use datafusion_expr::col;
use datatypes::arrow::compute;
use futures::future::BoxFuture;
use futures::{Stream, StreamExt, TryStreamExt, ready};
@@ -145,7 +146,20 @@ impl UserDefinedLogicalNodeCore for UnionDistinctOn {
}
fn expressions(&self) -> Vec<Expr> {
vec![]
let mut exprs: Vec<Expr> = self.compare_keys.iter().map(col).collect();
if !self.compare_keys.iter().any(|key| key == &self.ts_col) {
exprs.push(col(&self.ts_col));
}
exprs
}
fn necessary_children_exprs(&self, _output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
let left_len = self.left.schema().fields().len();
let right_len = self.right.schema().fields().len();
Some(vec![
(0..left_len).collect::<Vec<_>>(),
(0..right_len).collect::<Vec<_>>(),
])
}
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -540,9 +554,43 @@ mod test {
use datafusion::arrow::array::Int32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::ToDFSchema;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use super::*;
#[test]
fn pruning_should_keep_all_columns_for_exec() {
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int32, false),
Field::new("k", DataType::Int32, false),
Field::new("v", DataType::Int32, false),
]));
let df_schema = schema.to_dfschema_ref().unwrap();
let left = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema.clone(),
});
let right = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: df_schema.clone(),
});
let plan = UnionDistinctOn::new(
left,
right,
vec!["k".to_string()],
"ts".to_string(),
df_schema,
);
// Simulate a parent projection requesting only one output column.
let output_columns = [2usize];
let required = plan.necessary_children_exprs(&output_columns).unwrap();
assert_eq!(required.len(), 2);
assert_eq!(required[0].as_slice(), &[0, 1, 2]);
assert_eq!(required[1].as_slice(), &[0, 1, 2]);
}
#[test]
fn test_interleave_batches() {
let schema = Schema::new(vec![

View File

@@ -255,9 +255,9 @@ fn metrics_to_string(metrics: RecordBatchMetrics, format: AnalyzeFormat) -> DfRe
match format {
AnalyzeFormat::JSON => Ok(JsonMetrics::from_record_batch_metrics(metrics).to_string()),
AnalyzeFormat::TEXT => Ok(metrics.to_string()),
AnalyzeFormat::GRAPHVIZ => Err(DataFusionError::NotImplemented(
"GRAPHVIZ format is not supported for metrics output".to_string(),
)),
format => Err(DataFusionError::NotImplemented(format!(
"AnalyzeFormat {format}",
))),
}
}

View File

@@ -316,18 +316,15 @@ impl DatafusionQueryEngine {
return state
.create_physical_plan(logical_plan)
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu);
.map_err(Into::into);
}
// analyze first
let analyzed_plan = state
.analyzer()
.execute_and_check(logical_plan.clone(), state.config_options(), |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let analyzed_plan = state.analyzer().execute_and_check(
logical_plan.clone(),
state.config_options(),
|_, _| {},
)?;
logger.after_analyze = Some(analyzed_plan.clone());
@@ -341,10 +338,7 @@ impl DatafusionQueryEngine {
} else {
state
.optimizer()
.optimize(analyzed_plan, state, |_, _| {})
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
.optimize(analyzed_plan, state, |_, _| {})?
};
common_telemetry::debug!("Create physical plan, optimized plan: {optimized_plan}");
@@ -371,19 +365,10 @@ impl DatafusionQueryEngine {
// Optimized by extension rules
let optimized_plan = self
.state
.optimize_by_extension_rules(plan.clone(), context)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
.optimize_by_extension_rules(plan.clone(), context)?;
// Optimized by datafusion optimizer
let optimized_plan = self
.state
.session_state()
.optimize(&optimized_plan)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let optimized_plan = self.state.session_state().optimize(&optimized_plan)?;
Ok(optimized_plan)
}
@@ -516,11 +501,7 @@ impl QueryEngine for DatafusionQueryEngine {
}
fn read_table(&self, table: TableRef) -> Result<DataFrame> {
self.state
.read_table(table)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)
self.state.read_table(table).map_err(Into::into)
}
fn engine_context(&self, query_ctx: QueryContextRef) -> QueryEngineContext {
@@ -543,7 +524,8 @@ impl QueryEngine for DatafusionQueryEngine {
}
// configure execution options
state.config_mut().options_mut().execution.time_zone = query_ctx.timezone().to_string();
state.config_mut().options_mut().execution.time_zone =
Some(query_ctx.timezone().to_string());
// usually it's impossible to have both `set variable` set by sql client and
// hint in header by grpc client, so only need to deal with them separately
@@ -619,11 +601,7 @@ impl QueryExecutor for DatafusionQueryEngine {
Ok(Box::pin(EmptyRecordBatchStream::new(schema)))
}
1 => {
let df_stream = plan
.execute(0, task_ctx)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let df_stream = plan.execute(0, task_ctx)?;
let mut stream = RecordBatchStreamAdapter::try_new_with_span(df_stream, span)
.context(error::ConvertDfRecordBatchStreamSnafu)
.map_err(BoxedError::new)
@@ -652,11 +630,7 @@ impl QueryExecutor for DatafusionQueryEngine {
.output_partitioning()
.partition_count()
);
let df_stream = merged_plan
.execute(0, task_ctx)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let df_stream = merged_plan.execute(0, task_ctx)?;
let mut stream = RecordBatchStreamAdapter::try_new_with_span(df_stream, span)
.context(error::ConvertDfRecordBatchStreamSnafu)
.map_err(BoxedError::new)

View File

@@ -25,7 +25,7 @@ use snafu::{Location, Snafu};
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum InnerError {
#[snafu(display("DataFusion error"))]
#[snafu(transparent)]
Datafusion {
#[snafu(source)]
error: DataFusionError,

View File

@@ -1170,7 +1170,7 @@ fn test_simplify_select_now_expression() {
let expected = [
"Projection: now()",
" MergeScan [is_placeholder=false, remote_input=[",
r#"Projection: TimestampNanosecond(<TIME>, Some("+00:00")) AS now()"#,
r#"Projection: TimestampNanosecond(<TIME>, None) AS now()"#,
" TableScan: t",
"]]",
]

View File

@@ -143,7 +143,7 @@ mod tests {
let plan = create_test_plan_with_project(proj);
let result = StringNormalizationRule.analyze(plan, config).unwrap();
let expected = format!(
"Projection: CAST(Utf8(\"2017-07-23 13:10:11\") AS Timestamp({:#?}, None))\n TableScan: t",
"Projection: CAST(Utf8(\"2017-07-23 13:10:11\") AS Timestamp({}))\n TableScan: t",
time_unit
);
assert_eq!(expected, result.to_string());
@@ -162,7 +162,7 @@ mod tests {
.analyze(int_to_timestamp_plan, config)
.unwrap();
let expected = String::from(
"Projection: CAST(Int64(158412331400600000) AS Timestamp(Nanosecond, None))\n TableScan: t",
"Projection: CAST(Int64(158412331400600000) AS Timestamp(ns))\n TableScan: t",
);
assert_eq!(expected, result.to_string());

View File

@@ -4687,11 +4687,11 @@ mod test {
assert_eq!(
plan.display_indent_schema().to_string(),
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-1000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(
@@ -4717,14 +4717,14 @@ mod test {
assert_eq!(
plan.display_indent_schema().to_string(),
"Filter: prom_avg_over_time(timestamp_range,field) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(Millisecond, None)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
\n Projection: metrics.timestamp, prom_avg_over_time(timestamp_range, field) AS prom_avg_over_time(timestamp_range,field), metrics.tag [timestamp:Timestamp(Millisecond, None), prom_avg_over_time(timestamp_range,field):Float64;N, tag:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[5000], time index=[timestamp], values=[\"field\"] [field:Dictionary(Int64, Float64);N, tag:Utf8, timestamp:Timestamp(Millisecond, None), timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-6000, None) AND metrics.timestamp <= TimestampMillisecond(100001000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(Millisecond, None)]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
}

View File

@@ -123,7 +123,7 @@ tokio-rustls.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
tokio-util.workspace = true
tonic.workspace = true
tonic-reflection = "0.13"
tonic-reflection = "0.14"
tower = { workspace = true, features = ["full"] }
tower-http = { version = "0.6", features = ["full"] }
tracing.workspace = true

View File

@@ -715,7 +715,7 @@ fn replace_params_with_values(
if let Some(Some(t)) = param_types.get(&format_placeholder(i + 1)) {
let value = helper::convert_value(param, t)?;
values.push(value);
values.push(value.into());
}
}
@@ -744,7 +744,7 @@ fn replace_params_with_exprs(
if let Some(Some(t)) = param_types.get(&format_placeholder(i + 1)) {
let value = helper::convert_expr_to_scalar_value(param, t)?;
values.push(value);
values.push(value.into());
}
}

View File

@@ -13,8 +13,10 @@
// limitations under the License.
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use arrow_schema::Field;
use chrono::NaiveDate;
use common_query::prelude::ScalarValue;
use common_sql::convert::sql_value_to_value;
@@ -87,8 +89,8 @@ pub fn fix_placeholder_types(plan: &mut LogicalPlan) -> Result<()> {
let give_placeholder_types = |mut e: datafusion_expr::Expr| {
if let datafusion_expr::Expr::Cast(cast) = &mut e {
if let datafusion_expr::Expr::Placeholder(ph) = &mut *cast.expr {
if ph.data_type.is_none() {
ph.data_type = Some(cast.data_type.clone());
if ph.field.is_none() {
ph.field = Some(Arc::new(Field::new("", cast.data_type.clone(), true)));
common_telemetry::debug!(
"give placeholder type {:?} to {:?}",
cast.data_type,

View File

@@ -324,11 +324,12 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {
}
let output = if let Some(plan) = &sql_plan.plan {
let values = parameters_to_scalar_values(plan, portal)?;
let plan = plan
.clone()
.replace_params_with_values(&ParamValues::List(parameters_to_scalar_values(
plan, portal,
)?))
.replace_params_with_values(&ParamValues::List(
values.into_iter().map(Into::into).collect(),
))
.context(DataFusionSnafu)
.map_err(convert_err)?;
self.query_handler

View File

@@ -225,7 +225,7 @@ impl QueryContext {
/// Create a new datafusion's ConfigOptions instance based on the current QueryContext.
pub fn create_config_options(&self) -> ConfigOptions {
let mut config = ConfigOptions::default();
config.execution.time_zone = self.timezone().to_string();
config.execution.time_zone = Some(self.timezone().to_string());
config
}

View File

@@ -271,7 +271,7 @@ pub fn sql_data_type_to_concrete_data_type(
})?
.map(|t| ConcreteDataType::timestamp_datatype(t.unit()))
.unwrap_or(ConcreteDataType::timestamp_millisecond_datatype())),
SqlDataType::Interval => Ok(ConcreteDataType::interval_month_day_nano_datatype()),
SqlDataType::Interval { .. } => Ok(ConcreteDataType::interval_month_day_nano_datatype()),
SqlDataType::Decimal(exact_info) => match exact_info {
ExactNumberInfo::None => Ok(ConcreteDataType::decimal128_default_datatype()),
// refer to https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
@@ -333,7 +333,7 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu
ConcreteDataType::Int8(_) => Ok(SqlDataType::TinyInt(None)),
ConcreteDataType::UInt8(_) => Ok(SqlDataType::TinyIntUnsigned(None)),
ConcreteDataType::String(_) => Ok(SqlDataType::String(None)),
ConcreteDataType::Float32(_) => Ok(SqlDataType::Float(None)),
ConcreteDataType::Float32(_) => Ok(SqlDataType::Float(ExactNumberInfo::None)),
ConcreteDataType::Float64(_) => Ok(SqlDataType::Double(ExactNumberInfo::None)),
ConcreteDataType::Boolean(_) => Ok(SqlDataType::Boolean),
ConcreteDataType::Date(_) => Ok(SqlDataType::Date),
@@ -345,10 +345,13 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu
Some(time_type.precision()),
TimezoneInfo::None,
)),
ConcreteDataType::Interval(_) => Ok(SqlDataType::Interval),
ConcreteDataType::Interval(_) => Ok(SqlDataType::Interval {
fields: None,
precision: None,
}),
ConcreteDataType::Binary(_) => Ok(SqlDataType::Varbinary(None)),
ConcreteDataType::Decimal128(d) => Ok(SqlDataType::Decimal(
ExactNumberInfo::PrecisionAndScale(d.precision() as u64, d.scale() as u64),
ExactNumberInfo::PrecisionAndScale(d.precision() as u64, d.scale() as i64),
)),
ConcreteDataType::Json(_) => Ok(SqlDataType::JSON),
ConcreteDataType::Vector(v) => Ok(SqlDataType::Custom(
@@ -412,7 +415,7 @@ mod tests {
ConcreteDataType::string_datatype(),
);
check_type(
SqlDataType::Float(None),
SqlDataType::Float(ExactNumberInfo::None),
ConcreteDataType::float32_datatype(),
);
check_type(
@@ -450,7 +453,10 @@ mod tests {
ConcreteDataType::timestamp_microsecond_datatype(),
);
check_type(
SqlDataType::Interval,
SqlDataType::Interval {
fields: None,
precision: None,
},
ConcreteDataType::interval_month_day_nano_datatype(),
);
check_type(SqlDataType::JSON, ConcreteDataType::json_datatype());

View File

@@ -114,7 +114,7 @@ impl TransformRule for ExpandIntervalTransformRule {
kind,
format,
} => {
if DataType::Interval == *data_type {
if matches!(data_type, DataType::Interval { .. }) {
match &**cast_exp {
Expr::Value(ValueWithSpan {
value: Value::SingleQuotedString(value),
@@ -129,7 +129,7 @@ impl TransformRule for ExpandIntervalTransformRule {
*expr = Expr::Cast {
kind: kind.clone(),
expr: single_quoted_string_expr(interval_value),
data_type: DataType::Interval,
data_type: data_type.clone(),
format: std::mem::take(format),
}
}
@@ -392,7 +392,10 @@ mod tests {
let mut cast_to_interval_expr = Expr::Cast {
expr: single_quoted_string_expr("3y2mon".to_string()),
data_type: DataType::Interval,
data_type: DataType::Interval {
fields: None,
precision: None,
},
format: None,
kind: sqlparser::ast::CastKind::Cast,
};
@@ -407,7 +410,10 @@ mod tests {
expr: Box::new(Expr::Value(
Value::SingleQuotedString("3 years 2 months".to_string()).into()
)),
data_type: DataType::Interval,
data_type: DataType::Interval {
fields: None,
precision: None,
},
format: None,
}
);

View File

@@ -178,9 +178,9 @@ pub(crate) fn get_type_by_alias(data_type: &DataType) -> Option<DataType> {
DataType::UInt16 => Some(DataType::SmallIntUnsigned(None)),
DataType::UInt32 => Some(DataType::IntUnsigned(None)),
DataType::UInt64 => Some(DataType::BigIntUnsigned(None)),
DataType::Float4 => Some(DataType::Float(None)),
DataType::Float4 => Some(DataType::Float(ExactNumberInfo::None)),
DataType::Float8 => Some(DataType::Double(ExactNumberInfo::None)),
DataType::Float32 => Some(DataType::Float(None)),
DataType::Float32 => Some(DataType::Float(ExactNumberInfo::None)),
DataType::Float64 => Some(DataType::Double(ExactNumberInfo::None)),
DataType::Bool => Some(DataType::Boolean),
DataType::Datetime(_) => Some(DataType::Timestamp(Some(6), TimezoneInfo::None)),
@@ -222,9 +222,9 @@ pub(crate) fn get_data_type_by_alias_name(name: &str) -> Option<DataType> {
"UINT16" => Some(DataType::SmallIntUnsigned(None)),
"UINT32" => Some(DataType::IntUnsigned(None)),
"UINT64" => Some(DataType::BigIntUnsigned(None)),
"FLOAT4" => Some(DataType::Float(None)),
"FLOAT4" => Some(DataType::Float(ExactNumberInfo::None)),
"FLOAT8" => Some(DataType::Double(ExactNumberInfo::None)),
"FLOAT32" => Some(DataType::Float(None)),
"FLOAT32" => Some(DataType::Float(ExactNumberInfo::None)),
"FLOAT64" => Some(DataType::Double(ExactNumberInfo::None)),
// String type alias
"TINYTEXT" | "MEDIUMTEXT" | "LONGTEXT" => Some(DataType::Text),
@@ -256,7 +256,7 @@ mod tests {
);
assert_eq!(
get_data_type_by_alias_name("float32"),
Some(DataType::Float(None))
Some(DataType::Float(ExactNumberInfo::None))
);
assert_eq!(
get_data_type_by_alias_name("float8"),
@@ -264,7 +264,7 @@ mod tests {
);
assert_eq!(
get_data_type_by_alias_name("float4"),
Some(DataType::Float(None))
Some(DataType::Float(ExactNumberInfo::None))
);
assert_eq!(
get_data_type_by_alias_name("int8"),
@@ -370,7 +370,7 @@ mod tests {
match &stmts[0] {
Statement::Query(q) => assert_eq!(
format!(
"SELECT arrow_cast(TIMESTAMP '2020-01-01 01:23:45.12345678', 'Timestamp({expected}, None)')"
"SELECT arrow_cast(TIMESTAMP '2020-01-01 01:23:45.12345678', 'Timestamp({expected})')"
),
q.to_string()
),
@@ -402,19 +402,19 @@ mod tests {
#[test]
fn test_transform_timestamp_alias() {
// Timestamp[Second | Millisecond | Microsecond | Nanosecond]
test_timestamp_alias("TimestampSecond", "Second");
test_timestamp_alias("Timestamp_s", "Second");
test_timestamp_alias("TimestampMillisecond", "Millisecond");
test_timestamp_alias("Timestamp_ms", "Millisecond");
test_timestamp_alias("TimestampMicrosecond", "Microsecond");
test_timestamp_alias("Timestamp_us", "Microsecond");
test_timestamp_alias("TimestampNanosecond", "Nanosecond");
test_timestamp_alias("Timestamp_ns", "Nanosecond");
test_timestamp_alias("TimestampSecond", "s");
test_timestamp_alias("Timestamp_s", "s");
test_timestamp_alias("TimestampMillisecond", "ms");
test_timestamp_alias("Timestamp_ms", "ms");
test_timestamp_alias("TimestampMicrosecond", "µs");
test_timestamp_alias("Timestamp_us", "µs");
test_timestamp_alias("TimestampNanosecond", "ns");
test_timestamp_alias("Timestamp_ns", "ns");
// Timestamp(precision)
test_timestamp_precision_type(0, "Second");
test_timestamp_precision_type(3, "Millisecond");
test_timestamp_precision_type(6, "Microsecond");
test_timestamp_precision_type(9, "Nanosecond");
test_timestamp_precision_type(0, "s");
test_timestamp_precision_type(3, "ms");
test_timestamp_precision_type(6, "µs");
test_timestamp_precision_type(9, "ns");
}
#[test]

View File

@@ -425,7 +425,7 @@ mod tests {
.unwrap();
let serialized = serde_json::to_string(&expr).unwrap();
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"columns":[{"name":{"value":"mOLEsTIAs","quote_style":null},"column_type":{"Float64":{}},"options":["PrimaryKey","Null"]},{"name":{"value":"CUMQUe","quote_style":null},"column_type":{"Timestamp":{"Second":null}},"options":["TimeIndex"]},{"name":{"value":"NaTus","quote_style":null},"column_type":{"Int64":{}},"options":[]},{"name":{"value":"EXPeDITA","quote_style":null},"column_type":{"Float64":{}},"options":[]},{"name":{"value":"ImPEDiT","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.56425774}}]},{"name":{"value":"ADIpisci","quote_style":null},"column_type":{"Float32":{}},"options":["PrimaryKey"]},{"name":{"value":"deBITIs","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.31315368}}]},{"name":{"value":"toTaM","quote_style":null},"column_type":{"Int32":{}},"options":["NotNull"]},{"name":{"value":"QuI","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.39941502}}]},{"name":{"value":"INVeNtOre","quote_style":null},"column_type":{"Boolean":null},"options":["PrimaryKey"]}],"if_not_exists":true,"partition":{"columns":["mOLEsTIAs"],"exprs":[{"lhs":{"Column":"mOLEsTIAs"},"op":"Lt","rhs":{"Value":{"Float64":5.992310449541053e307}}},{"lhs":{"Expr":{"lhs":{"Column":"mOLEsTIAs"},"op":"GtEq","rhs":{"Value":{"Float64":5.992310449541053e307}}}},"op":"And","rhs":{"Expr":{"lhs":{"Column":"mOLEsTIAs"},"op":"Lt","rhs":{"Value":{"Float64":1.1984620899082105e308}}}}},{"lhs":{"Column":"mOLEsTIAs"},"op":"GtEq","rhs":{"Value":{"Float64":1.1984620899082105e308}}}]},"engine":"mito2","options":{},"primary_keys":[0,5,9]}"#;
let expected = r#"{"table_name":{"value":"quasi","quote_style":null},"columns":[{"name":{"value":"mOLEsTIAs","quote_style":null},"column_type":{"Float64":{}},"options":["PrimaryKey","Null"]},{"name":{"value":"CUMQUe","quote_style":null},"column_type":{"Timestamp":{"Second":null}},"options":["TimeIndex"]},{"name":{"value":"NaTus","quote_style":null},"column_type":{"Int64":{}},"options":[]},{"name":{"value":"EXPeDITA","quote_style":null},"column_type":{"Float64":{}},"options":[]},{"name":{"value":"ImPEDiT","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.56425774}}]},{"name":{"value":"ADIpisci","quote_style":null},"column_type":{"Float32":{}},"options":["PrimaryKey"]},{"name":{"value":"deBITIs","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.31315368}}]},{"name":{"value":"toTaM","quote_style":null},"column_type":{"Int32":{}},"options":["NotNull"]},{"name":{"value":"QuI","quote_style":null},"column_type":{"Float32":{}},"options":[{"DefaultValue":{"Float32":0.39941502}}]},{"name":{"value":"INVeNtOre","quote_style":null},"column_type":{"Boolean":null},"options":["PrimaryKey"]}],"if_not_exists":true,"partition":{"columns":["mOLEsTIAs"],"exprs":[{"lhs":{"Column":"mOLEsTIAs"},"op":"Lt","rhs":{"Value":{"Float64":5.992310449541053e+307}}},{"lhs":{"Expr":{"lhs":{"Column":"mOLEsTIAs"},"op":"GtEq","rhs":{"Value":{"Float64":5.992310449541053e+307}}}},"op":"And","rhs":{"Expr":{"lhs":{"Column":"mOLEsTIAs"},"op":"Lt","rhs":{"Value":{"Float64":1.1984620899082105e+308}}}}},{"lhs":{"Column":"mOLEsTIAs"},"op":"GtEq","rhs":{"Value":{"Float64":1.1984620899082105e+308}}}]},"engine":"mito2","options":{},"primary_keys":[0,5,9]}"#;
assert_eq!(expected, serialized);
}

View File

@@ -544,8 +544,7 @@ tql explain (1752591864, 1752592164, '30s') sum by (a, b, c) (rate(aggr_optimize
| | PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [true] |
| | PromSeriesDivide: tags=["a", "b", "c", "d"] |
| | Sort: aggr_optimize_not_count.a ASC NULLS FIRST, aggr_optimize_not_count.b ASC NULLS FIRST, aggr_optimize_not_count.c ASC NULLS FIRST, aggr_optimize_not_count.d ASC NULLS FIRST, aggr_optimize_not_count.greptime_timestamp ASC NULLS FIRST |
| | Projection: aggr_optimize_not_count.a, aggr_optimize_not_count.b, aggr_optimize_not_count.c, aggr_optimize_not_count.d, aggr_optimize_not_count.greptime_timestamp, aggr_optimize_not_count.greptime_value |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | MergeScan [is_placeholder=false, remote_input=[ |
| | Filter: aggr_optimize_not_count.greptime_timestamp >= TimestampMillisecond(-420000, None) AND aggr_optimize_not_count.greptime_timestamp <= TimestampMillisecond(300000, None) |
| | TableScan: aggr_optimize_not_count |
| | ]] |

File diff suppressed because one or more lines are too long

View File

@@ -152,7 +152,7 @@ SELECT * FROM integers i1 WHERE NOT EXISTS(SELECT i FROM integers WHERE i=i1.i)
SELECT i1.i,i2.i FROM integers i1, integers i2 WHERE i1.i=(SELECT i FROM integers WHERE i1.i=i) AND i1.i=i2.i ORDER BY i1.i;
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Correlated scalar subquery must be aggregated to return at most one row
Error: 3001(EngineExecuteQuery), Error during planning: Correlated scalar subquery must be aggregated to return at most one row
SELECT * FROM (SELECT i1.i AS a, i2.i AS b FROM integers i1, integers i2) a1 WHERE a=b ORDER BY 1;

View File

@@ -2,103 +2,103 @@
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | CooperativeExec |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | CooperativeExec |
| | RepartitionExec: partitioning=REDACTED
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers order by number desc;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 DESC] |
| | CooperativeExec |
| | SortExec: expr=[number@0 DESC], preserve_partitioning=[true] |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 DESC] |
| | CooperativeExec |
| | SortExec: expr=[number@0 DESC], preserve_partitioning=[true] |
| | RepartitionExec: partitioning=REDACTED
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers order by number asc;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 ASC NULLS LAST] |
| | CooperativeExec |
| | SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true] |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 ASC NULLS LAST] |
| | CooperativeExec |
| | SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[true] |
| | RepartitionExec: partitioning=REDACTED
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers order by number desc limit 10;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 DESC], fetch=10 |
| | CooperativeExec |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 DESC], fetch=10 |
| | CooperativeExec |
| | RepartitionExec: partitioning=REDACTED
| | SortExec: TopK(fetch=10), expr=[number@0 DESC], preserve_partitioning=[false] |
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | SortExec: TopK(fetch=10), expr=[number@0 DESC], preserve_partitioning=[false] |
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
explain select * from numbers order by number asc limit 10;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 ASC NULLS LAST], fetch=10 |
| | CooperativeExec |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST |
| | Projection: numbers.number |
| | TableScan: numbers |
| | ]] |
| physical_plan | SortPreservingMergeExec: [number@0 ASC NULLS LAST], fetch=10 |
| | CooperativeExec |
| | RepartitionExec: partitioning=REDACTED
| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] |
| | CooperativeExec |
| | StreamScanAdapter: [<SendableRecordBatchStream>], schema: [Schema { fields: [Field { name: "number", data_type: UInt32 }], metadata: {"greptime:version": "0"} }] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -38,7 +38,7 @@ Affected Rows: 3
SELECT AVG(i), AVG(1), AVG(DISTINCT i), AVG(NULL) FROM integers;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: Function 'avg' user-defined coercion failed with "Error during planning: The function \"avg\" does not support inputs of type Null." No function matches the given name and argument types 'avg(Null)'. You might need to add explicit type casts.
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Execution error: Function 'avg' user-defined coercion failed with "Error during planning: Avg does not support inputs of type Null." No function matches the given name and argument types 'avg(Null)'. You might need to add explicit type casts.
Candidate functions:
avg(UserDefined)

View File

@@ -1,19 +1,19 @@
--- date_add ---
SELECT date_add('2023-12-06 07:39:46.222'::TIMESTAMP_MS, INTERVAL '5 day');
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| date_add(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }")) |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2023-12-11T07:39:46.222 |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| date_add(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }")) |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2023-12-11T07:39:46.222 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
SELECT date_add('2023-12-06 07:39:46.222'::TIMESTAMP_MS, '5 day');
+----------------------------------------------------------------------------------------------------------+
| date_add(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),Utf8("5 day")) |
+----------------------------------------------------------------------------------------------------------+
| 2023-12-11T07:39:46.222 |
+----------------------------------------------------------------------------------------------------------+
+-------------------------------------------------------------------------------------------+
| date_add(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),Utf8("5 day")) |
+-------------------------------------------------------------------------------------------+
| 2023-12-11T07:39:46.222 |
+-------------------------------------------------------------------------------------------+
SELECT date_add('2023-12-06'::DATE, INTERVAL '3 month 5 day');
@@ -34,19 +34,19 @@ SELECT date_add('2023-12-06'::DATE, '3 month 5 day');
--- date_sub ---
SELECT date_sub('2023-12-06 07:39:46.222'::TIMESTAMP_MS, INTERVAL '5 day');
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| date_sub(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }")) |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2023-12-01T07:39:46.222 |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| date_sub(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }")) |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2023-12-01T07:39:46.222 |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
SELECT date_sub('2023-12-06 07:39:46.222'::TIMESTAMP_MS, '5 day');
+----------------------------------------------------------------------------------------------------------+
| date_sub(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),Utf8("5 day")) |
+----------------------------------------------------------------------------------------------------------+
| 2023-12-01T07:39:46.222 |
+----------------------------------------------------------------------------------------------------------+
+-------------------------------------------------------------------------------------------+
| date_sub(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),Utf8("5 day")) |
+-------------------------------------------------------------------------------------------+
| 2023-12-01T07:39:46.222 |
+-------------------------------------------------------------------------------------------+
SELECT date_sub('2023-12-06'::DATE, INTERVAL '3 month 5 day');
@@ -67,28 +67,28 @@ SELECT date_sub('2023-12-06'::DATE, '3 month 5 day');
--- date_format ---
SELECT date_format('2023-12-06 07:39:46.222'::TIMESTAMP_MS, '%Y-%m-%d %H:%M:%S:%3f');
+-----------------------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Millisecond, None)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+-----------------------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:222 |
+-----------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(ms)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+--------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:222 |
+--------------------------------------------------------------------------------------------------------------+
SELECT date_format('2023-12-06 07:39:46.222'::TIMESTAMP_S, '%Y-%m-%d %H:%M:%S:%3f');
+------------------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Second, None)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+------------------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:000 |
+------------------------------------------------------------------------------------------------------------------------+
+-------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(s)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+-------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:000 |
+-------------------------------------------------------------------------------------------------------------+
--- datetime not supported yet ---
SELECT date_format('2023-12-06 07:39:46.222'::DATETIME, '%Y-%m-%d %H:%M:%S:%3f');
+-----------------------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(Microsecond, None)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+-----------------------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:222 |
+-----------------------------------------------------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------------------------------------+
| date_format(arrow_cast(Utf8("2023-12-06 07:39:46.222"),Utf8("Timestamp(µs)")),Utf8("%Y-%m-%d %H:%M:%S:%3f")) |
+--------------------------------------------------------------------------------------------------------------+
| 2023-12-06 07:39:46:222 |
+--------------------------------------------------------------------------------------------------------------+
SELECT date_format('2023-12-06'::DATE, '%m-%d');

View File

@@ -335,11 +335,11 @@ FROM cell_cte;
SELECT json_encode_path(37.76938, -122.3889, 1728083375::TimestampSecond);
+----------------------------------------------------------------------------------------------------------------------+
| json_encode_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(Second, None)"))) |
+----------------------------------------------------------------------------------------------------------------------+
| [[-122.3889,37.76938]] |
+----------------------------------------------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------------------+
| json_encode_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(s)"))) |
+-----------------------------------------------------------------------------------------------------------+
| [[-122.3889,37.76938]] |
+-----------------------------------------------------------------------------------------------------------+
SELECT json_encode_path(lat, lon, ts)
FROM(
@@ -360,11 +360,11 @@ FROM(
SELECT UNNEST(geo_path(37.76938, -122.3889, 1728083375::TimestampSecond));
+----------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
| __unnest_placeholder(geo_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(Second, None)")))).lat | __unnest_placeholder(geo_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(Second, None)")))).lng |
+----------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
| [37.76938] | [-122.3889] |
+----------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------+
+-----------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
| __unnest_placeholder(geo_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(s)")))).lat | __unnest_placeholder(geo_path(Float64(37.76938),Float64(-122.3889),arrow_cast(Int64(1728083375),Utf8("Timestamp(s)")))).lng |
+-----------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
| [37.76938] | [-122.3889] |
+-----------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+
SELECT UNNEST(geo_path(lat, lon, ts))
FROM(

View File

@@ -22,9 +22,9 @@ select GREATEST('2000-02-11'::Date, '2020-12-30'::Date);
select GREATEST('2021-07-01 00:00:00'::Timestamp, '2024-07-01 00:00:00'::Timestamp);
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| greatest(arrow_cast(Utf8("2021-07-01 00:00:00"),Utf8("Timestamp(Millisecond, None)")),arrow_cast(Utf8("2024-07-01 00:00:00"),Utf8("Timestamp(Millisecond, None)"))) |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 2024-07-01T00:00:00 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------------------------------------------------------------------------------------------------------------------------------+
| greatest(arrow_cast(Utf8("2021-07-01 00:00:00"),Utf8("Timestamp(ms)")),arrow_cast(Utf8("2024-07-01 00:00:00"),Utf8("Timestamp(ms)"))) |
+---------------------------------------------------------------------------------------------------------------------------------------+
| 2024-07-01T00:00:00 |
+---------------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -1,10 +1,10 @@
-- Migrated from DuckDB test: test/sql/join/ complex condition tests
-- Tests complex join conditions and predicates
CREATE TABLE sales_reps(rep_id INTEGER, "name" VARCHAR, region VARCHAR, quota INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE sales_reps(rep_id INTEGER, "name" VARCHAR, "region" VARCHAR, quota INTEGER, ts TIMESTAMP TIME INDEX);
Affected Rows: 0
CREATE TABLE customer_accounts(account_id INTEGER, account_name VARCHAR, region VARCHAR, rep_id INTEGER, revenue INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE customer_accounts(account_id INTEGER, account_name VARCHAR, "region" VARCHAR, rep_id INTEGER, revenue INTEGER, ts TIMESTAMP TIME INDEX);
Affected Rows: 0

View File

@@ -1,9 +1,9 @@
-- Migrated from DuckDB test: test/sql/join/ complex condition tests
-- Tests complex join conditions and predicates
CREATE TABLE sales_reps(rep_id INTEGER, "name" VARCHAR, region VARCHAR, quota INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE sales_reps(rep_id INTEGER, "name" VARCHAR, "region" VARCHAR, quota INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE customer_accounts(account_id INTEGER, account_name VARCHAR, region VARCHAR, rep_id INTEGER, revenue INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE customer_accounts(account_id INTEGER, account_name VARCHAR, "region" VARCHAR, rep_id INTEGER, revenue INTEGER, ts TIMESTAMP TIME INDEX);
INSERT INTO sales_reps VALUES
(1, 'Tom', 'North', 100000, 1000), (2, 'Sarah', 'South', 150000, 2000),

View File

@@ -3,7 +3,7 @@ CREATE TABLE events_push(event_id INTEGER, user_id INTEGER, event_type VARCHAR,
Affected Rows: 0
CREATE TABLE users_push(user_id INTEGER, user_name VARCHAR, region VARCHAR, ts TIMESTAMP TIME INDEX);
CREATE TABLE users_push(user_id INTEGER, user_name VARCHAR, "region" VARCHAR, ts TIMESTAMP TIME INDEX);
Affected Rows: 0

View File

@@ -2,7 +2,7 @@
CREATE TABLE events_push(event_id INTEGER, user_id INTEGER, event_type VARCHAR, "value" INTEGER, ts TIMESTAMP TIME INDEX);
CREATE TABLE users_push(user_id INTEGER, user_name VARCHAR, region VARCHAR, ts TIMESTAMP TIME INDEX);
CREATE TABLE users_push(user_id INTEGER, user_name VARCHAR, "region" VARCHAR, ts TIMESTAMP TIME INDEX);
INSERT INTO events_push VALUES (1, 100, 'click', 1, 1000), (2, 100, 'view', 2, 2000), (3, 200, 'click', 1, 3000), (4, 300, 'purchase', 5, 4000);

View File

@@ -25,7 +25,7 @@ SELECT b FROM test ORDER BY b LIMIT 2 OFFSET 0;
SELECT a FROM test LIMIT 1.25;
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Float64
Error: 3001(EngineExecuteQuery), Error during planning: Expected LIMIT to be an integer or null, but got Float64
SELECT a FROM test LIMIT 2-1;
@@ -45,11 +45,11 @@ Error: 3000(PlanQuery), Failed to plan SQL: No field named a.
SELECT a FROM test LIMIT SUM(42);
Error: 1001(Unsupported), This feature is not implemented: Unsupported LIMIT expression: Some(AggregateFunction(AggregateFunction { func: AggregateUDF { inner: Sum { signature: Signature { type_signature: UserDefined, volatility: Immutable } } }, params: AggregateFunctionParams { args: [Literal(Int64(42), None)], distinct: false, filter: None, order_by: [], null_treatment: None } }))
Error: 1001(Unsupported), This feature is not implemented: Unsupported LIMIT expression: Some(AggregateFunction(AggregateFunction { func: AggregateUDF { inner: Sum { signature: Signature { type_signature: UserDefined, volatility: Immutable, parameter_names: None } } }, params: AggregateFunctionParams { args: [Literal(Int64(42), None)], distinct: false, filter: None, order_by: [], null_treatment: None } }))
SELECT a FROM test LIMIT row_number() OVER ();
Error: 3001(EngineExecuteQuery), This feature is not implemented: Unsupported LIMIT expression: Some(Cast(Cast { expr: WindowFunction(WindowFunction { fun: WindowUDF(WindowUDF { inner: RowNumber { signature: Signature { type_signature: Nullary, volatility: Immutable } } }), params: WindowFunctionParams { args: [], partition_by: [], order_by: [], window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }, filter: None, null_treatment: None, distinct: false } }), data_type: Int64 }))
Error: 3001(EngineExecuteQuery), This feature is not implemented: Unsupported LIMIT expression: Some(Cast(Cast { expr: WindowFunction(WindowFunction { fun: WindowUDF(WindowUDF { inner: RowNumber { signature: Signature { type_signature: Nullary, volatility: Immutable, parameter_names: None } } }), params: WindowFunctionParams { args: [], partition_by: [], order_by: [], window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }, filter: None, null_treatment: None, distinct: false } }), data_type: Int64 }))
CREATE TABLE test2 (a STRING, ts TIMESTAMP TIME INDEX);
@@ -69,7 +69,7 @@ SELECT * FROM test2 LIMIT 3;
select 1 limit date '1992-01-01';
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Date32
Error: 3001(EngineExecuteQuery), Error during planning: Expected LIMIT to be an integer or null, but got Date32
CREATE TABLE integers(i TIMESTAMP TIME INDEX);
@@ -102,23 +102,23 @@ SELECT * FROM integers LIMIT 4;
SELECT * FROM integers as int LIMIT (SELECT MIN(integers.i) FROM integers);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Timestamp(Millisecond, None)
Error: 3001(EngineExecuteQuery), Error during planning: Expected LIMIT to be an integer or null, but got Timestamp(ms)
SELECT * FROM integers as int OFFSET (SELECT MIN(integers.i) FROM integers);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected OFFSET to be an integer or null, but got Timestamp(Millisecond, None)
Error: 3001(EngineExecuteQuery), Error during planning: Expected OFFSET to be an integer or null, but got Timestamp(ms)
SELECT * FROM integers as int LIMIT (SELECT MAX(integers.i) FROM integers) OFFSET (SELECT MIN(integers.i) FROM integers);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Timestamp(Millisecond, None)
Error: 3001(EngineExecuteQuery), Error during planning: Expected LIMIT to be an integer or null, but got Timestamp(ms)
SELECT * FROM integers as int LIMIT (SELECT max(integers.i) FROM integers where i > 5);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Cannot infer common argument type for comparison operation Timestamp(Millisecond, None) > Int64
Error: 3001(EngineExecuteQuery), Error during planning: Cannot infer common argument type for comparison operation Timestamp(ms) > Int64
SELECT * FROM integers as int LIMIT (SELECT max(integers.i) FROM integers where i > 5);
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Cannot infer common argument type for comparison operation Timestamp(Millisecond, None) > Int64
Error: 3001(EngineExecuteQuery), Error during planning: Cannot infer common argument type for comparison operation Timestamp(ms) > Int64
SELECT * FROM integers as int LIMIT (SELECT NULL);
@@ -130,7 +130,7 @@ Error: 1001(Unsupported), This feature is not implemented: Unsupported LIMIT exp
SELECT * FROM integers as int LIMIT (SELECT 'ab');
Error: 3001(EngineExecuteQuery), DataFusion error: Error during planning: Expected LIMIT to be an integer or null, but got Utf8
Error: 3001(EngineExecuteQuery), Error during planning: Expected LIMIT to be an integer or null, but got Utf8
DROP TABLE integers;

View File

@@ -42,7 +42,7 @@ affected_rows: 0
-- SQLNESS PROTOCOL MYSQL
EXECUTE stmt USING 'a';
affected_rows: 0
Failed to execute query, err: MySqlError { ERROR 1815 (HY000): (EngineExecuteQuery): Cast error: Cannot cast string 'a' to value of Int32 type }
-- SQLNESS PROTOCOL MYSQL
DEALLOCATE stmt;

View File

@@ -732,7 +732,7 @@ CREATE TABLE IF NOT EXISTS node_network_transmit_bytes_total (
host STRING NULL,
job STRING NULL,
node STRING NULL,
region STRING NULL,
"region" STRING NULL,
src_port STRING NULL,
src STRING NULL,
src_namespace STRING NULL,

View File

@@ -349,7 +349,7 @@ CREATE TABLE IF NOT EXISTS node_network_transmit_bytes_total (
host STRING NULL,
job STRING NULL,
node STRING NULL,
region STRING NULL,
"region" STRING NULL,
src_port STRING NULL,
src STRING NULL,
src_namespace STRING NULL,

View File

@@ -45,19 +45,19 @@ SELECT
FROM
ngx_access_log);
+---------------+-----------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) |
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
| | Projection: TimestampNanosecond(NOW, Some("+00:00")) AS now() |
| | TableScan: ngx_access_log |
| | ]] |
| physical_plan | CooperativeExec |
+---------------+-------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | Projection: count(Int64(1)) AS count(*) |
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] |
| | Projection: TimestampNanosecond(NOW, None) AS now() |
| | TableScan: ngx_access_log |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------+
| | |
+---------------+-------------------------------------------------------------------------+
DROP TABLE ngx_access_log;

View File

@@ -760,10 +760,8 @@ DESC TABLE COLUMN_PRIVILEGES;
SELECT * FROM COLUMN_PRIVILEGES;
+---------+---------------+--------------+------------+-------------+----------------+--------------+
| grantee | table_catalog | table_schema | table_name | column_name | privilege_type | is_grantable |
+---------+---------------+--------------+------------+-------------+----------------+--------------+
+---------+---------------+--------------+------------+-------------+----------------+--------------+
++
++
DESC TABLE COLUMN_STATISTICS;
@@ -778,10 +776,8 @@ DESC TABLE COLUMN_STATISTICS;
SELECT * FROM COLUMN_STATISTICS;
+-------------+------------+-------------+-----------+
| schema_name | table_name | column_name | histogram |
+-------------+------------+-------------+-----------+
+-------------+------------+-------------+-----------+
++
++
SELECT * FROM CHARACTER_SETS;
@@ -820,10 +816,8 @@ DESC TABLE CHECK_CONSTRAINTS;
SELECT * FROM CHECK_CONSTRAINTS;
+--------------------+-------------------+-----------------+--------------+
| constraint_catalog | constraint_schema | constraint_name | check_clause |
+--------------------+-------------------+-----------------+--------------+
+--------------------+-------------------+-----------------+--------------+
++
++
DESC TABLE REGION_PEERS;

Some files were not shown because too many files have changed in this diff Show More