feat: impl instant query and add tests (#1452)

* feat: impl instant query and add tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-04-25 11:08:14 +08:00
committed by GitHub
parent 2287db7ff7
commit f9ea6b63bf
6 changed files with 170 additions and 27 deletions

2
Cargo.lock generated
View File

@@ -3823,7 +3823,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=57e6a032db70264ec394cbb8a2f327ad33705d76#57e6a032db70264ec394cbb8a2f327ad33705d76"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a26c40c004f998180b8acd853b22f083773f36b9#a26c40c004f998180b8acd853b22f083773f36b9"
dependencies = [
"prost",
"tonic 0.9.1",

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "57e6a032db70264ec394cbb8a2f327ad33705d76" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a26c40c004f998180b8acd853b22f083773f36b9" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::health_check_client::HealthCheckClient;
use api::v1::prometheus_gateway_client::PrometheusGatewayClient;
use api::v1::HealthCheckRequest;
use arrow_flight::flight_service_client::FlightServiceClient;
use common_grpc::channel_manager::ChannelManager;
@@ -156,6 +157,11 @@ impl Client {
})
}
pub fn make_prometheus_gateway_client(&self) -> Result<PrometheusGatewayClient<Channel>> {
let (_, channel) = self.find_channel()?;
Ok(PrometheusGatewayClient::new(channel))
}
pub async fn health_check(&self) -> Result<()> {
let (_, channel) = self.find_channel()?;
let mut client = HealthCheckClient::new(channel);

View File

@@ -20,8 +20,10 @@ use api::v1::promql_request::Promql;
use api::v1::{PromqlRequest, PromqlResponse, ResponseHeader};
use async_trait::async_trait;
use query::parser::PromQuery;
use snafu::OptionExt;
use tonic::{Request, Response};
use crate::error::InvalidQuerySnafu;
use crate::grpc::handler::create_query_context;
use crate::grpc::TonicResult;
use crate::prom::{retrieve_metric_name, PromHandlerRef, PromJsonResponse};
@@ -34,14 +36,21 @@ pub struct PrometheusGatewayService {
impl PrometheusGateway for PrometheusGatewayService {
async fn handle(&self, req: Request<PromqlRequest>) -> TonicResult<Response<PromqlResponse>> {
let inner = req.into_inner();
let prom_query = match inner.promql {
Some(Promql::RangeQuery(range_query)) => PromQuery {
let prom_query = match inner.promql.context(InvalidQuerySnafu {
reason: "Expecting non-empty PromqlRequest.",
})? {
Promql::RangeQuery(range_query) => PromQuery {
query: range_query.query,
start: range_query.start,
end: range_query.end,
step: range_query.step,
},
_ => unimplemented!(),
Promql::InstantQuery(instant_query) => PromQuery {
query: instant_query.query,
start: instant_query.time.clone(),
end: instant_query.time,
step: String::from("1s"),
},
};
let query_context = create_query_context(inner.header.as_ref());

View File

@@ -13,7 +13,7 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::net::SocketAddr;
use std::sync::Arc;
@@ -156,30 +156,30 @@ impl Server for PromServer {
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromSeries {
metric: HashMap<String, String>,
values: Vec<(f64, String)>,
pub metric: HashMap<String, String>,
pub values: Vec<(f64, String)>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromData {
#[serde(rename = "resultType")]
result_type: String,
result: Vec<PromSeries>,
pub result_type: String,
pub result: Vec<PromSeries>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PromJsonResponse {
status: String,
data: PromData,
pub status: String,
pub data: PromData,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "errorType")]
error_type: Option<String>,
pub error_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
warnings: Option<Vec<String>>,
pub warnings: Option<Vec<String>>,
}
impl PromJsonResponse {
@@ -281,7 +281,7 @@ impl PromJsonResponse {
})?;
let metric_name = (METRIC_NAME.to_string(), metric_name);
let mut buffer = HashMap::<Vec<(String, String)>, Vec<(f64, String)>>::new();
let mut buffer = BTreeMap::<Vec<(String, String)>, Vec<(f64, String)>>::new();
for batch in batches.iter() {
// prepare things...
@@ -338,7 +338,7 @@ impl PromJsonResponse {
metric: tags.into_iter().collect(),
values,
})
.collect();
.collect::<Vec<_>>();
let data = PromData {
result_type: "matrix".to_string(),

View File

@@ -11,15 +11,18 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::alter_expr::Kind;
use api::v1::column::SemanticType;
use api::v1::promql_request::Promql;
use api::v1::{
column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr,
InsertRequest, TableId,
InsertRequest, PromInstantQuery, PromRangeQuery, PromqlRequest, RequestHeader, TableId,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE};
use common_query::Output;
use servers::prom::{PromData, PromJsonResponse, PromSeries};
use servers::server::Server;
use tests_integration::test_util::{setup_grpc_server, StorageType};
@@ -53,18 +56,20 @@ macro_rules! grpc_tests {
grpc_test!(
$service,
test_invalid_dbname,
test_auto_create_table,
test_insert_and_select,
test_dbname,
test_health_check,
test_prom_gateway_query,
);
)*
};
}
#[tokio::test]
pub async fn test_invalid_dbname() {
pub async fn test_invalid_dbname(store_type: StorageType) {
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server(StorageType::File, "auto_create_table").await;
setup_grpc_server(store_type, "auto_create_table").await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new_with_dbname("tom", grpc_client);
@@ -308,10 +313,9 @@ fn testing_create_expr() -> CreateTableExpr {
}
}
#[tokio::test]
pub async fn test_health_check() {
pub async fn test_health_check(store_type: StorageType) {
let (addr, mut guard, fe_grpc_server) =
setup_grpc_server(StorageType::File, "auto_create_table").await;
setup_grpc_server(store_type, "auto_create_table").await;
let grpc_client = Client::with_urls(vec![addr]);
let r = grpc_client.health_check().await;
@@ -320,3 +324,127 @@ pub async fn test_health_check() {
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_prom_gateway_query(store_type: StorageType) {
// prepare connection
let (addr, mut guard, fe_grpc_server) = setup_grpc_server(store_type, "prom_gateway").await;
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
grpc_client.clone(),
);
let mut gateway_client = grpc_client.make_prometheus_gateway_client().unwrap();
// create table and insert data
db.sql("CREATE TABLE test(i DOUBLE, j TIMESTAMP TIME INDEX, k STRING PRIMARY KEY);")
.await
.unwrap();
db.sql(r#"INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");"#)
.await
.unwrap();
// Instant query using prometheus gateway service
let header = RequestHeader {
dbname: "public".to_string(),
..Default::default()
};
let instant_query = PromInstantQuery {
query: "test".to_string(),
time: "5".to_string(),
};
let instant_query_request = PromqlRequest {
header: Some(header.clone()),
promql: Some(Promql::InstantQuery(instant_query)),
};
let json_bytes = gateway_client
.handle(instant_query_request)
.await
.unwrap()
.into_inner()
.body;
let instant_query_result = serde_json::from_slice::<PromJsonResponse>(&json_bytes).unwrap();
let expected = PromJsonResponse {
status: "success".to_string(),
data: PromData {
result_type: "matrix".to_string(),
result: vec![
PromSeries {
metric: [
("k".to_string(), "a".to_string()),
("__name__".to_string(), "test".to_string()),
]
.into_iter()
.collect(),
values: vec![(5.0, "2".to_string())],
},
PromSeries {
metric: [
("__name__".to_string(), "test".to_string()),
("k".to_string(), "b".to_string()),
]
.into_iter()
.collect(),
values: vec![(5.0, "1".to_string())],
},
],
},
error: None,
error_type: None,
warnings: None,
};
assert_eq!(instant_query_result, expected);
// Range query using prometheus gateway service
let range_query = PromRangeQuery {
query: "test".to_string(),
start: "0".to_string(),
end: "10".to_string(),
step: "5s".to_string(),
};
let range_query_request: PromqlRequest = PromqlRequest {
header: Some(header),
promql: Some(Promql::RangeQuery(range_query)),
};
let json_bytes = gateway_client
.handle(range_query_request)
.await
.unwrap()
.into_inner()
.body;
let range_query_result = serde_json::from_slice::<PromJsonResponse>(&json_bytes).unwrap();
let expected = PromJsonResponse {
status: "success".to_string(),
data: PromData {
result_type: "matrix".to_string(),
result: vec![
PromSeries {
metric: [
("__name__".to_string(), "test".to_string()),
("k".to_string(), "a".to_string()),
]
.into_iter()
.collect(),
values: vec![(5.0, "2".to_string()), (10.0, "2".to_string())],
},
PromSeries {
metric: [
("__name__".to_string(), "test".to_string()),
("k".to_string(), "b".to_string()),
]
.into_iter()
.collect(),
values: vec![(5.0, "1".to_string()), (10.0, "1".to_string())],
},
],
},
error: None,
error_type: None,
warnings: None,
};
assert_eq!(range_query_result, expected);
// clean up
let _ = fe_grpc_server.shutdown().await;
guard.remove_all().await;
}