feat: export promql service in server (#924)

* chore: some tiny typo/style fix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: add promql server

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* works for mocked query

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* integration test case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* expose promql api to our http server

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adjust router structure

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-02-03 16:28:56 +08:00
committed by GitHub
parent 184ca78a4d
commit fc9276c79d
24 changed files with 574 additions and 26 deletions

View File

@@ -26,6 +26,7 @@ use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use frontend::promql::PromqlOptions;
use frontend::Plugins;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
@@ -72,6 +73,7 @@ pub struct StandaloneOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub promql_options: Option<PromqlOptions>,
pub mode: Mode,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
@@ -88,6 +90,7 @@ impl Default for StandaloneOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
promql_options: Some(PromqlOptions::default()),
mode: Mode::Standalone,
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
@@ -106,6 +109,7 @@ impl StandaloneOptions {
opentsdb_options: self.opentsdb_options,
influxdb_options: self.influxdb_options,
prometheus_options: self.prometheus_options,
promql_options: self.promql_options,
mode: self.mode,
meta_client_opts: None,
}

View File

@@ -13,13 +13,16 @@
// limitations under the License.
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging::info;
use common_telemetry::timer;
use query::parser::{QueryLanguageParser, QueryStatement};
use servers::error as server_error;
use servers::promql::PromqlHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::statement::Statement;
@@ -209,6 +212,16 @@ impl SqlQueryHandler for Instance {
vec![result]
}
async fn do_promql_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
let result = self.execute_promql(query, query_ctx).await;
vec![result]
}
async fn do_statement_query(
&self,
stmt: Statement,
@@ -227,6 +240,17 @@ impl SqlQueryHandler for Instance {
}
}
#[async_trait]
impl PromqlHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
self.execute_promql(query, QueryContext::arc())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu { query })
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;

View File

@@ -17,3 +17,4 @@
pub const METRIC_HANDLE_SQL_ELAPSED: &str = "datanode.handle_sql_elapsed";
pub const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "datanode.handle_scripts_elapsed";
pub const METRIC_RUN_SCRIPT_ELAPSED: &str = "datanode.run_script_elapsed";
pub const METRIC_HANDLE_PROMQL_ELAPSED: &str = "datanode.handle_promql_elapsed";

View File

@@ -336,6 +336,14 @@ pub enum Error {
#[snafu(backtrace)]
source: partition::error::Error,
},
// TODO(ruihang): merge all query execution error kinds
#[snafu(display("failed to execute PromQL query {}, source: {}", query, source))]
ExecutePromql {
query: String,
#[snafu(backtrace)]
source: servers::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -351,6 +359,7 @@ impl ErrorExt for Error {
Error::NotSupported { .. } => StatusCode::Unsupported,
Error::RuntimeResource { source, .. } => source.status_code(),
Error::ExecutePromql { source, .. } => source.status_code(),
Error::SqlExecIntercepted { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),

View File

@@ -28,6 +28,7 @@ use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::prometheus::PrometheusOptions;
use crate::promql::PromqlOptions;
use crate::server::Services;
use crate::Plugins;
@@ -41,6 +42,7 @@ pub struct FrontendOptions {
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub promql_options: Option<PromqlOptions>,
pub mode: Mode,
pub meta_client_opts: Option<MetaClientOpts>,
}
@@ -55,6 +57,7 @@ impl Default for FrontendOptions {
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
promql_options: Some(PromqlOptions::default()),
mode: Mode::Standalone,
meta_client_opts: None,
}

View File

@@ -43,6 +43,7 @@ use partition::manager::PartitionRuleManager;
use partition::route::TableRoutes;
use servers::error as server_error;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::promql::{PromqlHandler, PromqlHandlerRef};
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef};
use servers::query_handler::{
@@ -57,7 +58,9 @@ use sql::statements::statement::Statement;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{self, Error, MissingMetasrvOptsSnafu, Result};
use crate::error::{
self, Error, ExecutePromqlSnafu, MissingMetasrvOptsSnafu, NotSupportedSnafu, Result,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler};
@@ -71,6 +74,7 @@ pub trait FrontendInstance:
+ InfluxdbLineProtocolHandler
+ PrometheusProtocolHandler
+ ScriptHandler
+ PromqlHandler
+ Send
+ Sync
+ 'static
@@ -88,6 +92,7 @@ pub struct Instance {
script_handler: Option<ScriptHandlerRef>,
sql_handler: SqlQueryHandlerRef<Error>,
grpc_query_handler: GrpcQueryHandlerRef<Error>,
promql_handler: Option<PromqlHandlerRef>,
create_expr_factory: CreateExprFactoryRef,
@@ -123,6 +128,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: Default::default(),
})
}
@@ -164,6 +170,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: StandaloneSqlQueryHandler::arc(dn_instance.clone()),
grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()),
promql_handler: Some(dn_instance.clone()),
plugins: Default::default(),
}
}
@@ -176,6 +183,7 @@ impl Instance {
create_expr_factory: Arc::new(DefaultCreateExprFactory),
sql_handler: dist_instance.clone(),
grpc_query_handler: dist_instance,
promql_handler: None,
plugins: Default::default(),
}
}
@@ -447,6 +455,21 @@ impl SqlQueryHandler for Instance {
}
}
async fn do_promql_query(&self, query: &str, _: QueryContextRef) -> Vec<Result<Output>> {
if let Some(handler) = &self.promql_handler {
let result = handler
.do_query(query)
.await
.context(ExecutePromqlSnafu { query });
vec![result]
} else {
vec![Err(NotSupportedSnafu {
feat: "PromQL Query",
}
.build())]
}
}
async fn do_statement_query(
&self,
stmt: Statement,
@@ -496,6 +519,20 @@ impl ScriptHandler for Instance {
}
}
#[async_trait]
impl PromqlHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
if let Some(promql_handler) = &self.promql_handler {
promql_handler.do_query(query).await
} else {
server_error::NotSupportedSnafu {
feat: "PromQL query in Frontend",
}
.fail()
}
}
}
#[cfg(test)]
mod tests {
use std::borrow::Cow;

View File

@@ -395,6 +395,14 @@ impl SqlQueryHandler for DistInstance {
self.handle_sql(query, query_ctx).await
}
async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}
async fn do_statement_query(
&self,
stmt: Statement,

View File

@@ -47,6 +47,14 @@ impl SqlQueryHandler for StandaloneSqlQueryHandler {
.collect()
}
async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}
async fn do_statement_query(
&self,
stmt: Statement,

View File

@@ -28,6 +28,7 @@ pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prometheus;
pub mod promql;
mod server;
mod sql;
mod table;

View File

@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PromqlOptions {
pub addr: String,
}
impl Default for PromqlOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4004".to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::PromqlOptions;
#[test]
fn test_prometheus_options() {
let default = PromqlOptions::default();
assert_eq!(default.addr, "127.0.0.1:4004".to_string());
}
}

View File

@@ -24,6 +24,7 @@ use servers::http::HttpServer;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::promql::PromqlServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
@@ -154,7 +155,7 @@ impl Services {
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
http_options.clone(),
);
if let Some(user_provider) = user_provider {
if let Some(user_provider) = user_provider.clone() {
http_server.set_user_provider(user_provider);
}
@@ -181,12 +182,26 @@ impl Services {
None
};
let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options {
let promql_addr = parse_addr(&promql_options.addr)?;
let mut promql_server = PromqlServer::create_server(instance.clone());
if let Some(user_provider) = user_provider {
promql_server.set_user_provider(user_provider);
}
Some((promql_server as _, promql_addr))
} else {
None
};
try_join!(
start_server(http_server_and_addr),
start_server(grpc_server_and_addr),
start_server(mysql_server_and_addr),
start_server(postgres_server_and_addr),
start_server(opentsdb_server_and_addr)
start_server(opentsdb_server_and_addr),
start_server(promql_server_and_addr),
)
.context(error::StartServerSnafu)?;
Ok(())

View File

@@ -81,7 +81,7 @@ impl Server for GrpcServer {
.await
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
info!("GRPC server is bound to {}", addr);
info!("gRPC server is bound to {}", addr);
*shutdown_tx = Some(tx);

View File

@@ -399,8 +399,8 @@ impl HttpServer {
pub fn make_app(&self) -> Router {
let mut api = OpenApi {
info: Info {
title: "Greptime DB HTTP API".to_string(),
description: Some("HTTP APIs to interact with Greptime DB".to_string()),
title: "GreptimeDB HTTP API".to_string(),
description: Some("HTTP APIs to interact with GreptimeDB".to_string()),
version: HTTP_API_VERSION.to_string(),
..Info::default()
},
@@ -470,6 +470,11 @@ impl HttpServer {
apirouting::get_with(handler::sql, handler::sql_docs)
.post_with(handler::sql, handler::sql_docs),
)
.api_route(
"/promql",
apirouting::get_with(handler::promql, handler::sql_docs)
.post_with(handler::promql, handler::sql_docs),
)
.api_route("/scripts", apirouting::post(script::scripts))
.api_route("/run-script", apirouting::post(script::run_script))
.route("/private/api.json", apirouting::get(serve_api))
@@ -577,6 +582,14 @@ mod test {
unimplemented!()
}
async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,

View File

@@ -59,6 +59,32 @@ pub async fn sql(
Json(resp.with_execution_time(start.elapsed().as_millis()))
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct PromqlQuery {
pub query: String,
}
/// Handler to execute promql
#[axum_macros::debug_handler]
pub async fn promql(
State(state): State<ApiState>,
Query(params): Query<PromqlQuery>,
// TODO(fys): pass _user_info into query context
_user_info: Extension<UserInfo>,
) -> Json<JsonResponse> {
let sql_handler = &state.sql_handler;
let start = Instant::now();
let resp = match super::query_context_from_db(sql_handler.clone(), None) {
Ok(query_ctx) => {
JsonResponse::from_output(sql_handler.do_promql_query(&params.query, query_ctx).await)
.await
}
Err(resp) => resp,
};
Json(resp.with_execution_time(start.elapsed().as_millis()))
}
pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation {
op.response::<200, Json<JsonResponse>>()
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
#![feature(assert_matches)]
#![feature(try_blocks)]
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use serde::{Deserialize, Serialize};
@@ -28,11 +29,11 @@ pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prometheus;
pub mod promql;
pub mod query_handler;
pub mod server;
pub mod tls;
mod shutdown;
pub mod tls;
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]

255
src/servers/src/promql.rs Normal file
View File

@@ -0,0 +1,255 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use axum::body::BoxBody;
use axum::extract::{Query, State};
use axum::{routing, Json, Router};
use common_error::prelude::ErrorExt;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
use futures::FutureExt;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tower::ServiceBuilder;
use tower_http::auth::AsyncRequireAuthorizationLayer;
use tower_http::trace::TraceLayer;
use crate::auth::UserProviderRef;
use crate::error::{AlreadyStartedSnafu, CollectRecordbatchSnafu, Result, StartHttpSnafu};
use crate::http::authorize::HttpAuth;
use crate::server::Server;
pub const PROMQL_API_VERSION: &str = "v1";
pub type PromqlHandlerRef = Arc<dyn PromqlHandler + Send + Sync>;
#[async_trait]
pub trait PromqlHandler {
async fn do_query(&self, query: &str) -> Result<Output>;
}
pub struct PromqlServer {
query_handler: PromqlHandlerRef,
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
}
impl PromqlServer {
pub fn create_server(query_handler: PromqlHandlerRef) -> Box<Self> {
Box::new(PromqlServer {
query_handler,
shutdown_tx: Mutex::new(None),
user_provider: None,
})
}
pub fn set_user_provider(&mut self, user_provider: UserProviderRef) {
debug_assert!(self.user_provider.is_none());
self.user_provider = Some(user_provider);
}
pub fn make_app(&self) -> Router {
// TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods
let router = Router::new()
.route("/query", routing::post(instant_query).get(instant_query))
.route("/range_query", routing::post(range_query).get(range_query))
.with_state(self.query_handler.clone());
Router::new()
.nest(&format!("/{PROMQL_API_VERSION}"), router)
// middlewares
.layer(
ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
// custom layer
.layer(AsyncRequireAuthorizationLayer::new(
HttpAuth::<BoxBody>::new(self.user_provider.clone()),
)),
)
}
}
#[async_trait]
impl Server for PromqlServer {
async fn shutdown(&self) -> Result<()> {
let mut shutdown_tx = self.shutdown_tx.lock().await;
if let Some(tx) = shutdown_tx.take() {
if tx.send(()).is_err() {
info!("Receiver dropped, the PromQl server has already existed");
}
}
info!("Shutdown PromQL server");
Ok(())
}
async fn start(&self, listening: SocketAddr) -> Result<SocketAddr> {
let (tx, rx) = oneshot::channel();
let server = {
let mut shutdown_tx = self.shutdown_tx.lock().await;
ensure!(
shutdown_tx.is_none(),
AlreadyStartedSnafu { server: "PromQL" }
);
let app = self.make_app();
let server = axum::Server::bind(&listening).serve(app.into_make_service());
*shutdown_tx = Some(tx);
server
};
let listening = server.local_addr();
info!("PromQL server is bound to {}", listening);
let graceful = server.with_graceful_shutdown(rx.map(drop));
graceful.await.context(StartHttpSnafu)?;
Ok(listening)
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct PromqlSeries {
metric: HashMap<String, String>,
values: Vec<(i64, String)>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct PromqlData {
#[serde(rename = "resultType")]
result_type: String,
result: Vec<PromqlSeries>,
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct PromqlJsonResponse {
status: String,
data: PromqlData,
error: Option<String>,
error_type: Option<String>,
warnings: Option<Vec<String>>,
}
impl PromqlJsonResponse {
pub fn error<S1, S2>(error_type: S1, reason: S2) -> Json<Self>
where
S1: Into<String>,
S2: Into<String>,
{
Json(PromqlJsonResponse {
status: "error".to_string(),
data: PromqlData::default(),
error: Some(reason.into()),
error_type: Some(error_type.into()),
warnings: None,
})
}
pub fn success(data: PromqlData) -> Json<Self> {
Json(PromqlJsonResponse {
status: "success".to_string(),
data,
error: None,
error_type: None,
warnings: None,
})
}
/// Convert from `Result<Output>`
pub async fn from_query_result(result: Result<Output>) -> Json<Self> {
let response: Result<Json<Self>> = try {
let json = match result? {
Output::RecordBatches(batches) => {
Self::success(Self::record_batches_to_data(batches)?)
}
Output::Stream(stream) => {
let record_batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
Self::success(Self::record_batches_to_data(record_batches)?)
}
Output::AffectedRows(_) => Self::error(
"unexpected result",
"expected data result, but got affected rows",
),
};
json
};
response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string()))
}
/// TODO(ruihang): implement this conversion method
fn record_batches_to_data(_: RecordBatches) -> Result<PromqlData> {
let data = PromqlData {
result_type: "matrix".to_string(),
result: vec![PromqlSeries {
metric: vec![("__name__".to_string(), "foo".to_string())]
.into_iter()
.collect(),
values: vec![(1, "123.45".to_string())],
}],
};
Ok(data)
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct InstantQuery {
query: String,
time: Option<String>,
timeout: Option<String>,
}
#[axum_macros::debug_handler]
pub async fn instant_query(
State(_handler): State<PromqlHandlerRef>,
Query(_params): Query<InstantQuery>,
) -> Json<PromqlJsonResponse> {
PromqlJsonResponse::error(
"not implemented",
"instant query api `/query` is not implemented. Use `/range_query` instead.",
)
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct RangeQuery {
query: String,
start: String,
end: String,
step: String,
timeout: Option<String>,
}
#[axum_macros::debug_handler]
pub async fn range_query(
State(handler): State<PromqlHandlerRef>,
Query(params): Query<RangeQuery>,
) -> Json<PromqlJsonResponse> {
let result = handler.do_query(&params.query).await;
PromqlJsonResponse::from_query_result(result).await
}

View File

@@ -12,6 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! All query handler traits for various request protocols, like SQL or GRPC.
//! Instance that wishes to support certain request protocol, just implement the corresponding
//! trait, the Server will handle codec for you.
//!
//! Note:
//! Query handlers are not confined to only handle read requests, they are expecting to handle
//! write requests too. So the "query" here not might seem ambiguity. However, "query" has been
//! used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the
//! word "query".
pub mod grpc;
pub mod sql;
@@ -27,16 +37,6 @@ use crate::influxdb::InfluxdbRequest;
use crate::opentsdb::codec::DataPoint;
use crate::prometheus::Metrics;
/// All query handler traits for various request protocols, like SQL or GRPC.
/// Instance that wishes to support certain request protocol, just implement the corresponding
/// trait, the Server will handle codec for you.
///
/// Note:
/// Query handlers are not confined to only handle read requests, they are expecting to handle
/// write requests too. So the "query" here not might seem ambiguity. However, "query" has been
/// used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the
/// word "query".
pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
pub type PrometheusProtocolHandlerRef = Arc<dyn PrometheusProtocolHandler + Send + Sync>;

View File

@@ -35,6 +35,12 @@ pub trait SqlQueryHandler {
query_ctx: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>>;
async fn do_promql_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>>;
async fn do_statement_query(
&self,
stmt: Statement,
@@ -75,6 +81,22 @@ where
.collect()
}
async fn do_promql_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
self.0
.do_promql_query(query, query_ctx)
.await
.into_iter()
.map(|x| {
x.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query })
})
.collect()
}
async fn do_statement_query(
&self,
stmt: Statement,

View File

@@ -54,6 +54,14 @@ impl SqlQueryHandler for DummyInstance {
unimplemented!()
}
async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,

View File

@@ -52,6 +52,14 @@ impl SqlQueryHandler for DummyInstance {
unimplemented!()
}
async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,

View File

@@ -77,6 +77,14 @@ impl SqlQueryHandler for DummyInstance {
unimplemented!()
}
async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,

View File

@@ -68,6 +68,14 @@ impl SqlQueryHandler for DummyInstance {
vec![Ok(output)]
}
async fn do_promql_query(
&self,
_: &str,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()
}
async fn do_statement_query(
&self,
_stmt: sql::statements::statement::Statement,

View File

@@ -40,6 +40,7 @@ use once_cell::sync::OnceCell;
use rand::Rng;
use servers::grpc::GrpcServer;
use servers::http::{HttpOptions, HttpServer};
use servers::promql::PromqlServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::server::Server;
@@ -265,7 +266,7 @@ async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance {
frontend_instance
}
pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
instance.start().await.unwrap();
@@ -283,7 +284,7 @@ pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, Tes
(http_server.make_app(), guard)
}
pub async fn setup_test_app_with_frontend(
pub async fn setup_test_http_app_with_frontend(
store_type: StorageType,
name: &str,
) -> (Router, TestGuard) {
@@ -307,6 +308,26 @@ pub async fn setup_test_app_with_frontend(
(app, guard)
}
pub async fn setup_test_promql_app_with_frontend(
store_type: StorageType,
name: &str,
) -> (Router, TestGuard) {
let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name);
let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap());
let frontend = build_frontend_instance(instance.clone()).await;
instance.start().await.unwrap();
create_test_table(
frontend.catalog_manager(),
instance.sql_handler(),
ConcreteDataType::timestamp_millisecond_datatype(),
)
.await
.unwrap();
let promql_server = PromqlServer::create_server(Arc::new(frontend) as _);
let app = promql_server.make_app();
(app, guard)
}
pub async fn setup_grpc_server(
store_type: StorageType,
name: &str,

View File

@@ -18,7 +18,10 @@ use common_error::status_code::StatusCode as ErrorCode;
use serde_json::json;
use servers::http::handler::HealthResponse;
use servers::http::{JsonOutput, JsonResponse};
use tests_integration::test_util::{setup_test_app, setup_test_app_with_frontend, StorageType};
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend, setup_test_promql_app_with_frontend,
StorageType,
};
#[macro_export]
macro_rules! http_test {
@@ -50,6 +53,7 @@ macro_rules! http_tests {
$service,
test_sql_api,
test_promql_api,
test_metrics_api,
test_scripts_api,
test_health_api,
@@ -60,7 +64,7 @@ macro_rules! http_tests {
pub async fn test_sql_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_app_with_frontend(store_type, "sql_api").await;
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await;
let client = TestClient::new(app);
let res = client.get("/v1/sql").send().await;
assert_eq!(res.status(), StatusCode::OK);
@@ -261,10 +265,35 @@ pub async fn test_sql_api(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_promql_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_promql_app_with_frontend(store_type, "promql_api").await;
let client = TestClient::new(app);
// instant query
let res = client.get("/v1/query?query=up").send().await;
assert_eq!(res.status(), StatusCode::OK);
let res = client.post("/v1/query?query=up").send().await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/v1/range_query?query=up&start=1&end=100&step=5")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post("/v1/range_query?query=up&start=1&end=100&step=5")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
guard.remove_all().await;
}
pub async fn test_metrics_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
common_telemetry::init_default_metrics_recorder();
let (app, mut guard) = setup_test_app(store_type, "metrics_api").await;
let (app, mut guard) = setup_test_http_app(store_type, "metrics_api").await;
let client = TestClient::new(app);
// Send a sql
@@ -284,7 +313,7 @@ pub async fn test_metrics_api(store_type: StorageType) {
pub async fn test_scripts_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_app_with_frontend(store_type, "script_api").await;
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "script_api").await;
let client = TestClient::new(app);
let res = client
@@ -325,7 +354,7 @@ def test(n):
pub async fn test_health_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, _guard) = setup_test_app_with_frontend(store_type, "health_api").await;
let (app, _guard) = setup_test_http_app_with_frontend(store_type, "health_api").await;
let client = TestClient::new(app);
// we can call health api with both `GET` and `POST` method.