mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 00:40:40 +00:00
feat: implement promql query on grpc (#1134)
* feat: implement promql query on grpc * test: resolve test errors * test: add tests for promql grpc api * refactor: align prom object name with proto * chore: switch proto revision to main
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3062,7 +3062,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ad0187295035e83f76272da553453e649b7570de#ad0187295035e83f76272da553453e649b7570de"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3a715150563b89d5dfc81a5838eac1f66a5658a1#3a715150563b89d5dfc81a5838eac1f66a5658a1"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"tonic",
|
||||
|
||||
@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
|
||||
common-error = { path = "../common/error" }
|
||||
common-time = { path = "../common/time" }
|
||||
datatypes = { path = "../datatypes" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ad0187295035e83f76272da553453e649b7570de" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3a715150563b89d5dfc81a5838eac1f66a5658a1" }
|
||||
prost.workspace = true
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
tonic.workspace = true
|
||||
|
||||
@@ -20,7 +20,7 @@ use api::v1::greptime_request::Request;
|
||||
use api::v1::query_request::Query;
|
||||
use api::v1::{
|
||||
AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DropTableExpr, GreptimeRequest,
|
||||
InsertRequest, QueryRequest, RequestHeader,
|
||||
InsertRequest, PromRangeQuery, QueryRequest, RequestHeader,
|
||||
};
|
||||
use arrow_flight::{FlightData, Ticket};
|
||||
use common_error::prelude::*;
|
||||
@@ -96,6 +96,24 @@ impl Database {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn prom_range_query(
|
||||
&self,
|
||||
promql: &str,
|
||||
start: &str,
|
||||
end: &str,
|
||||
step: &str,
|
||||
) -> Result<Output> {
|
||||
self.do_get(Request::Query(QueryRequest {
|
||||
query: Some(Query::PromRangeQuery(PromRangeQuery {
|
||||
query: promql.to_string(),
|
||||
start: start.to_string(),
|
||||
end: end.to_string(),
|
||||
step: step.to_string(),
|
||||
})),
|
||||
}))
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::CreateTable(expr)),
|
||||
|
||||
@@ -18,7 +18,7 @@ use api::v1::query_request::Query;
|
||||
use api::v1::{CreateDatabaseExpr, DdlRequest, InsertRequest};
|
||||
use async_trait::async_trait;
|
||||
use common_query::Output;
|
||||
use query::parser::QueryLanguageParser;
|
||||
use query::parser::{PromQuery, QueryLanguageParser};
|
||||
use query::plan::LogicalPlan;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -61,6 +61,15 @@ impl Instance {
|
||||
self.execute_stmt(stmt, ctx).await?
|
||||
}
|
||||
Query::LogicalPlan(plan) => self.execute_logical(plan).await?,
|
||||
Query::PromRangeQuery(promql) => {
|
||||
let prom_query = PromQuery {
|
||||
query: promql.query,
|
||||
start: promql.start,
|
||||
end: promql.end,
|
||||
step: promql.step,
|
||||
};
|
||||
self.execute_promql(&prom_query, ctx).await?
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ use api::v1::greptime_request::Request;
|
||||
use api::v1::query_request::Query;
|
||||
use async_trait::async_trait;
|
||||
use common_query::Output;
|
||||
use query::parser::PromQuery;
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
@@ -54,6 +55,23 @@ impl GrpcQueryHandler for Instance {
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
Query::PromRangeQuery(promql) => {
|
||||
let prom_query = PromQuery {
|
||||
query: promql.query,
|
||||
start: promql.start,
|
||||
end: promql.end,
|
||||
step: promql.step,
|
||||
};
|
||||
let mut result =
|
||||
SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await;
|
||||
ensure!(
|
||||
result.len() == 1,
|
||||
error::NotSupportedSnafu {
|
||||
feat: "execute multiple statements in PromQL query string through GRPC interface"
|
||||
}
|
||||
);
|
||||
result.remove(0)?
|
||||
}
|
||||
}
|
||||
}
|
||||
Request::Ddl(request) => {
|
||||
@@ -542,4 +560,100 @@ CREATE TABLE {table_name} (
|
||||
+---------------------+---+---+";
|
||||
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_promql_query() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let standalone = tests::create_standalone_instance("test_standalone_promql_query").await;
|
||||
let instance = &standalone.instance;
|
||||
|
||||
let table_name = "my_table";
|
||||
let sql = format!("CREATE TABLE {table_name} (h string, a double, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY(h))");
|
||||
create_table(instance, sql).await;
|
||||
|
||||
let insert = InsertRequest {
|
||||
table_name: table_name.to_string(),
|
||||
columns: vec![
|
||||
Column {
|
||||
column_name: "h".to_string(),
|
||||
values: Some(Values {
|
||||
string_values: vec![
|
||||
"t".to_string(),
|
||||
"t".to_string(),
|
||||
"t".to_string(),
|
||||
"t".to_string(),
|
||||
"t".to_string(),
|
||||
"t".to_string(),
|
||||
"t".to_string(),
|
||||
"t".to_string(),
|
||||
],
|
||||
..Default::default()
|
||||
}),
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
datatype: ColumnDataType::String as i32,
|
||||
..Default::default()
|
||||
},
|
||||
Column {
|
||||
column_name: "a".to_string(),
|
||||
values: Some(Values {
|
||||
f64_values: vec![1f64, 11f64, 20f64, 22f64, 50f64, 55f64, 99f64],
|
||||
..Default::default()
|
||||
}),
|
||||
null_mask: vec![4],
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
},
|
||||
Column {
|
||||
column_name: "ts".to_string(),
|
||||
values: Some(Values {
|
||||
ts_millisecond_values: vec![
|
||||
1672557972000,
|
||||
1672557973000,
|
||||
1672557974000,
|
||||
1672557975000,
|
||||
1672557976000,
|
||||
1672557977000,
|
||||
1672557978000,
|
||||
1672557979000,
|
||||
],
|
||||
..Default::default()
|
||||
}),
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
row_count: 8,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let request = Request::Insert(insert);
|
||||
let output = query(instance, request).await;
|
||||
assert!(matches!(output, Output::AffectedRows(8)));
|
||||
|
||||
let request = Request::Query(QueryRequest {
|
||||
query: Some(Query::PromRangeQuery(api::v1::PromRangeQuery {
|
||||
query: "my_table".to_owned(),
|
||||
start: "1672557973".to_owned(),
|
||||
end: "1672557978".to_owned(),
|
||||
step: "1s".to_owned(),
|
||||
})),
|
||||
});
|
||||
let output = query(instance, request).await;
|
||||
let Output::Stream(stream) = output else { unreachable!() };
|
||||
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+---+------+---------------------+
|
||||
| h | a | ts |
|
||||
+---+------+---------------------+
|
||||
| t | 11.0 | 2023-01-01T07:26:13 |
|
||||
| t | | 2023-01-01T07:26:14 |
|
||||
| t | 20.0 | 2023-01-01T07:26:15 |
|
||||
| t | 22.0 | 2023-01-01T07:26:16 |
|
||||
| t | 50.0 | 2023-01-01T07:26:17 |
|
||||
| t | 55.0 | 2023-01-01T07:26:18 |
|
||||
+---+------+---------------------+";
|
||||
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -175,6 +175,23 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
result.remove(0)?
|
||||
}
|
||||
Query::LogicalPlan(_) => unimplemented!(),
|
||||
Query::PromRangeQuery(promql) => {
|
||||
let prom_query = PromQuery {
|
||||
query: promql.query,
|
||||
start: promql.start,
|
||||
end: promql.end,
|
||||
step: promql.step,
|
||||
};
|
||||
let mut result =
|
||||
SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await;
|
||||
ensure!(
|
||||
result.len() == 1,
|
||||
NotSupportedSnafu {
|
||||
feat: "execute multiple statements in PromQL query string through GRPC interface"
|
||||
}
|
||||
);
|
||||
result.remove(0)?
|
||||
}
|
||||
}
|
||||
}
|
||||
Request::Ddl(_) => unimplemented!(),
|
||||
|
||||
Reference in New Issue
Block a user