Files
greptimedb/tests-integration/src/otlp.rs
Yingwen 35b635f639 feat!: Bump datafusion, prost, hyper, tonic, tower, axum (#5417)
* change dep

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: adapt to arrow's interval array

* chore: fix compile errors in datatypes crate

* chore: fix api crate compiler errors

* chore: fix compiler errors in common-grpc

* chore: fix common-datasource errors

* chore: fix deprecated code in common-datasource

* fix promql and physical plan related

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: upgrading network deps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* block on updating `sqlparser`

* upgrade sqlparser

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adapt new df's trait requirements

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: fix compiler errors in mito2

* chore: fix common-function crate errors

* chore: fix catalog errors

* change import path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: fix some errors in query crate

* chore: fix some errors in query crate

* aggr expr and some other tiny fixes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: fix expr related errors in query crate

* chore: fix query serializer and admin command

* chore: fix grpc services

* feat: axum serve

* chore: fix http server

* remove handle_error handler
* refactor timeout layer
* serve axum

* chore: fix flow aggr functions

* chore: fix flow

* feat: fix errors in meta-srv

* boxed()
* use TokioIo

* feat!: Remove script crate and python feature (#5321)

* feat: exclude script crate

* chore: simplify feature

* feat: remove the script crate

* chore: remove python feature and some comments

* chore: fix warning

* chore: fix servers tests compiler errors

* feat: fix tests-integration errors

* chore: fix unused

* test: fix catalog test

* chore: fix compiler errors for crates using common-meta

testing feature is enabled when check with --workspace

* test: use display for logical plan test

* test: implement rewrite for ScanHintRule

* fix: http server build panic

* test: fix mito test

* fix: sql parser type alias error

* test: fix TestClient not listen

* test: some flow tests

* test(flow): more fix

* fix: test_otlp_logs

* test: fix promql test that using deprecated method fun()

* fix: sql type replace supports Int8 ~ Int64, UInt8 ~ UInt64

* test: fix infer schema test case

* test: fix tests related to plan display

* chore: fix last flow test

* test: fix function format related assertion

* test: use larger port range for tests

* fix: test_otlp_traces

* fix: test_otlp_metrics

* fix range query and dist plan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: flow handle distinct use deprecated field

* fix: can't pass Join plan expressions to LogicalPlan::with_new_exprs

* test: fix deserialize test

* test: reduce split key case num

* tests: lower case aggr func name

* test: fix some sqlness tests

* tests: more sqlness fix

* tests: fixed sqlness test

* commit non-bug changes

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: make our udf correct

* fix: implement empty methods of ContextProvider for DfContextProviderAdapter

* test: update sqlness test result

* chore: remove unused

* fix: provide alias name for AggregateExprBuilder in range plan

* test: update range query result

* fix: implement missing ContextProvider methods for DfContextProviderAdapter

* test: update timestamps, cte result

* fix: supports empty projection in mito

* test: update comment for cte test

* fix: support projection for numbers

* test: update test cases after projection fix

* fix: fix range select first_value/last_value

* fix: handle CAST and time index conflict

* fix: handle order by correctly in range first_value/last_value

* test: update sqlness result

* test: update view test result

* test: update decimal test

wait for https://github.com/apache/datafusion/pull/14126 to fix this

* feat: remove redundant physical optimization

todo(ruihang): Check if we can remove this.

* test: update sqlness test result

* chore: range select default sort use nulls_first = false

* test: update filter push down test result

* test: comment deciaml test to avoid different panic message

* test: update some distributed test result

* test: update test for distributed count and filter push down

* test: update subqueries test

* fix: SessionState may overwrite our UDFs

* chore: fix compiler errors after merging main

* fix: fix elasticsearch and dashboard router panic

* chore: fix common-functions tests

* chore: update sqlness result

* test: fix id keyword and update sqlness result

* test: fix flow_null test

* fix: enlarge thread size in debug mode to avoid overflow

* chore: fix warnings in common-function

* chore: fix warning in flow

* chore: fix warnings in query crate

* chore: remove unused warnings

* chore: fix deprecated warnings for parquet

* chore: fix deprecated warning in servers crate

* style: fix clippy

* test: enlarge mito cache tttl test ttl time

* chore: fix typo

* style: fmt toml

* refactor: reimplement PartialOrd for RangeSelect

* chore: remove script crate files introduced by merge

* fix: return error if sql option is not kv

* chore: do not use ..default::default()

* chore: per review

* chore: update error message in BuildAdminFunctionArgsSnafu

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* refactor: typed precision

* update sqlness view case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: flow per review

* chore: add example in comment

* chore: warn if parquet stats of timestamp is not INT64

* style: add a newline before derive to make the comment more clear

* test: update sqlness result

* fix: flow from substrait

* chore: change update_range_context log to debug level

* chore: move axum-extra axum-macros to workspace

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: luofucong <luofc@foxmail.com>
Co-authored-by: discord9 <discord9@163.com>
Co-authored-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
2025-01-23 06:15:40 +00:00

230 lines
9.0 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod test {
use std::sync::Arc;
use client::{OutputData, DEFAULT_CATALOG_NAME};
use common_recordbatch::RecordBatches;
use frontend::instance::Instance;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value::Value as Val;
use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value;
use opentelemetry_proto::tonic::metrics::v1::{metric, NumberDataPoint, *};
use opentelemetry_proto::tonic::resource::v1::Resource;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContext;
use crate::standalone::GreptimeDbStandaloneBuilder;
use crate::tests;
#[tokio::test(flavor = "multi_thread")]
pub async fn test_otlp_on_standalone() {
let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_otlp")
.build()
.await;
let instance = &standalone.instance;
test_otlp(instance).await;
}
#[tokio::test(flavor = "multi_thread")]
pub async fn test_otlp_on_distributed() {
let instance = tests::create_distributed_instance("test_standalone_otlp").await;
test_otlp(&instance.frontend()).await;
}
async fn test_otlp(instance: &Arc<Instance>) {
let req = build_request();
let db = "otlp";
let ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));
assert!(SqlQueryHandler::do_query(
instance.as_ref(),
&format!("CREATE DATABASE IF NOT EXISTS {db}"),
ctx.clone(),
)
.await
.first()
.unwrap()
.is_ok());
let resp = instance.metrics(req, ctx.clone()).await;
assert!(resp.is_ok());
let mut output = instance
.do_query(
"SELECT * FROM my_test_metric ORDER BY greptime_timestamp",
ctx.clone(),
)
.await;
let output = output.remove(0).unwrap();
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+------------+-------+--------------------+------------+-------------------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+-------------------------------+----------------+
| greptimedb | otel | java | testsevrer | 1970-01-01T00:00:00.000000100 | 100.0 |
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000105 | 105.0 |
+------------+-------+--------------------+------------+-------------------------------+----------------+",
);
let mut output = instance
.do_query(
"SELECT le, greptime_value FROM my_test_histo_bucket order by le",
ctx.clone(),
)
.await;
let output = output.remove(0).unwrap();
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+-----+----------------+
| le | greptime_value |
+-----+----------------+
| 1 | 1.0 |
| 5 | 3.0 |
| inf | 4.0 |
+-----+----------------+",
);
let mut output = instance
.do_query("SELECT * FROM my_test_histo_sum", ctx.clone())
.await;
let output = output.remove(0).unwrap();
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+------------+-------+--------------------+------------+-------------------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+-------------------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000100 | 51.0 |
+------------+-------+--------------------+------------+-------------------------------+----------------+",
);
let mut output = instance
.do_query("SELECT * FROM my_test_histo_count", ctx.clone())
.await;
let output = output.remove(0).unwrap();
let OutputData::Stream(stream) = output.data else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
recordbatches.pretty_print().unwrap(),
"\
+------------+-------+--------------------+------------+-------------------------------+----------------+
| resource | scope | telemetry_sdk_name | host | greptime_timestamp | greptime_value |
+------------+-------+--------------------+------------+-------------------------------+----------------+
| greptimedb | otel | java | testserver | 1970-01-01T00:00:00.000000100 | 4.0 |
+------------+-------+--------------------+------------+-------------------------------+----------------+"
);
}
fn build_request() -> ExportMetricsServiceRequest {
let data_points = vec![
NumberDataPoint {
attributes: vec![keyvalue("host", "testsevrer")],
time_unix_nano: 100,
value: Some(Value::AsInt(100)),
..Default::default()
},
NumberDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 105,
value: Some(Value::AsInt(105)),
..Default::default()
},
];
let gauge = Gauge { data_points };
let histo_data_points = vec![HistogramDataPoint {
attributes: vec![keyvalue("host", "testserver")],
time_unix_nano: 100,
count: 4,
bucket_counts: vec![1, 2, 1],
explicit_bounds: vec![1.0f64, 5.0f64],
sum: Some(51f64),
..Default::default()
}];
let histo = Histogram {
data_points: histo_data_points,
aggregation_temporality: 0,
};
ExportMetricsServiceRequest {
resource_metrics: vec![ResourceMetrics {
scope_metrics: vec![ScopeMetrics {
metrics: vec![
Metric {
name: "my.test.metric".into(),
description: "my ignored desc".into(),
unit: "my ignored unit".into(),
metadata: vec![],
data: Some(metric::Data::Gauge(gauge)),
},
Metric {
name: "my.test.histo".into(),
description: "my ignored desc".into(),
unit: "my ignored unit".into(),
metadata: vec![],
data: Some(metric::Data::Histogram(histo)),
},
],
scope: Some(InstrumentationScope {
attributes: vec![
keyvalue("scope", "otel"),
keyvalue("telemetry.sdk.name", "java"),
],
..Default::default()
}),
..Default::default()
}],
resource: Some(Resource {
attributes: vec![keyvalue("resource", "greptimedb")],
dropped_attributes_count: 0,
}),
..Default::default()
}],
}
}
fn keyvalue(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.into(),
value: Some(AnyValue {
value: Some(Val::StringValue(value.into())),
}),
}
}
}