From fc9276c79d7a5888ffaa5f0755ba7ae58016f0f3 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 3 Feb 2023 16:28:56 +0800 Subject: [PATCH] feat: export promql service in server (#924) * chore: some tiny typo/style fix Signed-off-by: Ruihang Xia * feat: add promql server Signed-off-by: Ruihang Xia * works for mocked query Signed-off-by: Ruihang Xia * clean up Signed-off-by: Ruihang Xia * integration test case Signed-off-by: Ruihang Xia * resolve CR comments Signed-off-by: Ruihang Xia * expose promql api to our http server Signed-off-by: Ruihang Xia * resolve CR comments Signed-off-by: Ruihang Xia * adjust router structure Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/cmd/src/standalone.rs | 4 + src/datanode/src/instance/sql.rs | 26 ++- src/datanode/src/metric.rs | 1 + src/frontend/src/error.rs | 9 + src/frontend/src/frontend.rs | 3 + src/frontend/src/instance.rs | 39 +++- src/frontend/src/instance/distributed.rs | 8 + src/frontend/src/instance/standalone.rs | 8 + src/frontend/src/lib.rs | 1 + src/frontend/src/promql.rs | 39 ++++ src/frontend/src/server.rs | 19 +- src/servers/src/grpc.rs | 2 +- src/servers/src/http.rs | 17 +- src/servers/src/http/handler.rs | 26 +++ src/servers/src/lib.rs | 5 +- src/servers/src/promql.rs | 255 ++++++++++++++++++++++ src/servers/src/query_handler.rs | 20 +- src/servers/src/query_handler/sql.rs | 22 ++ src/servers/tests/http/influxdb_test.rs | 8 + src/servers/tests/http/opentsdb_test.rs | 8 + src/servers/tests/http/prometheus_test.rs | 8 + src/servers/tests/mod.rs | 8 + tests-integration/src/test_util.rs | 25 ++- tests-integration/tests/http.rs | 39 +++- 24 files changed, 574 insertions(+), 26 deletions(-) create mode 100644 src/frontend/src/promql.rs create mode 100644 src/servers/src/promql.rs diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 26d30cbbf2..cb4efc072d 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -26,6 +26,7 @@ use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; use frontend::prometheus::PrometheusOptions; +use frontend::promql::PromqlOptions; use frontend::Plugins; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; @@ -72,6 +73,7 @@ pub struct StandaloneOptions { pub opentsdb_options: Option, pub influxdb_options: Option, pub prometheus_options: Option, + pub promql_options: Option, pub mode: Mode, pub wal: WalConfig, pub storage: ObjectStoreConfig, @@ -88,6 +90,7 @@ impl Default for StandaloneOptions { opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), + promql_options: Some(PromqlOptions::default()), mode: Mode::Standalone, wal: WalConfig::default(), storage: ObjectStoreConfig::default(), @@ -106,6 +109,7 @@ impl StandaloneOptions { opentsdb_options: self.opentsdb_options, influxdb_options: self.influxdb_options, prometheus_options: self.prometheus_options, + promql_options: self.promql_options, mode: self.mode, meta_client_opts: None, } diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 9e28c1802f..d7862f8762 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -13,13 +13,16 @@ // limitations under the License. use async_trait::async_trait; +use common_error::prelude::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging::info; use common_telemetry::timer; use query::parser::{QueryLanguageParser, QueryStatement}; +use servers::error as server_error; +use servers::promql::PromqlHandler; use servers::query_handler::sql::SqlQueryHandler; -use session::context::QueryContextRef; +use session::context::{QueryContext, QueryContextRef}; use snafu::prelude::*; use sql::ast::ObjectName; use sql::statements::statement::Statement; @@ -209,6 +212,16 @@ impl SqlQueryHandler for Instance { vec![result] } + async fn do_promql_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec> { + let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED); + let result = self.execute_promql(query, query_ctx).await; + vec![result] + } + async fn do_statement_query( &self, stmt: Statement, @@ -227,6 +240,17 @@ impl SqlQueryHandler for Instance { } } +#[async_trait] +impl PromqlHandler for Instance { + async fn do_query(&self, query: &str) -> server_error::Result { + let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED); + self.execute_promql(query, QueryContext::arc()) + .await + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { query }) + } +} + #[cfg(test)] mod test { use std::sync::Arc; diff --git a/src/datanode/src/metric.rs b/src/datanode/src/metric.rs index 22cb05b2c5..88f50b5ed4 100644 --- a/src/datanode/src/metric.rs +++ b/src/datanode/src/metric.rs @@ -17,3 +17,4 @@ pub const METRIC_HANDLE_SQL_ELAPSED: &str = "datanode.handle_sql_elapsed"; pub const METRIC_HANDLE_SCRIPTS_ELAPSED: &str = "datanode.handle_scripts_elapsed"; pub const METRIC_RUN_SCRIPT_ELAPSED: &str = "datanode.run_script_elapsed"; +pub const METRIC_HANDLE_PROMQL_ELAPSED: &str = "datanode.handle_promql_elapsed"; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index d45f34eed1..7624021e80 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -336,6 +336,14 @@ pub enum Error { #[snafu(backtrace)] source: partition::error::Error, }, + + // TODO(ruihang): merge all query execution error kinds + #[snafu(display("failed to execute PromQL query {}, source: {}", query, source))] + ExecutePromql { + query: String, + #[snafu(backtrace)] + source: servers::error::Error, + }, } pub type Result = std::result::Result; @@ -351,6 +359,7 @@ impl ErrorExt for Error { Error::NotSupported { .. } => StatusCode::Unsupported, Error::RuntimeResource { source, .. } => source.status_code(), + Error::ExecutePromql { source, .. } => source.status_code(), Error::SqlExecIntercepted { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index ad92b65dff..899c5929bc 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -28,6 +28,7 @@ use crate::mysql::MysqlOptions; use crate::opentsdb::OpentsdbOptions; use crate::postgres::PostgresOptions; use crate::prometheus::PrometheusOptions; +use crate::promql::PromqlOptions; use crate::server::Services; use crate::Plugins; @@ -41,6 +42,7 @@ pub struct FrontendOptions { pub opentsdb_options: Option, pub influxdb_options: Option, pub prometheus_options: Option, + pub promql_options: Option, pub mode: Mode, pub meta_client_opts: Option, } @@ -55,6 +57,7 @@ impl Default for FrontendOptions { opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), + promql_options: Some(PromqlOptions::default()), mode: Mode::Standalone, meta_client_opts: None, } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 7018f14828..6a6010d153 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -43,6 +43,7 @@ use partition::manager::PartitionRuleManager; use partition::route::TableRoutes; use servers::error as server_error; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; +use servers::promql::{PromqlHandler, PromqlHandlerRef}; use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef}; use servers::query_handler::{ @@ -57,7 +58,9 @@ use sql::statements::statement::Statement; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; -use crate::error::{self, Error, MissingMetasrvOptsSnafu, Result}; +use crate::error::{ + self, Error, ExecutePromqlSnafu, MissingMetasrvOptsSnafu, NotSupportedSnafu, Result, +}; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler}; @@ -71,6 +74,7 @@ pub trait FrontendInstance: + InfluxdbLineProtocolHandler + PrometheusProtocolHandler + ScriptHandler + + PromqlHandler + Send + Sync + 'static @@ -88,6 +92,7 @@ pub struct Instance { script_handler: Option, sql_handler: SqlQueryHandlerRef, grpc_query_handler: GrpcQueryHandlerRef, + promql_handler: Option, create_expr_factory: CreateExprFactoryRef, @@ -123,6 +128,7 @@ impl Instance { create_expr_factory: Arc::new(DefaultCreateExprFactory), sql_handler: dist_instance.clone(), grpc_query_handler: dist_instance, + promql_handler: None, plugins: Default::default(), }) } @@ -164,6 +170,7 @@ impl Instance { create_expr_factory: Arc::new(DefaultCreateExprFactory), sql_handler: StandaloneSqlQueryHandler::arc(dn_instance.clone()), grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), + promql_handler: Some(dn_instance.clone()), plugins: Default::default(), } } @@ -176,6 +183,7 @@ impl Instance { create_expr_factory: Arc::new(DefaultCreateExprFactory), sql_handler: dist_instance.clone(), grpc_query_handler: dist_instance, + promql_handler: None, plugins: Default::default(), } } @@ -447,6 +455,21 @@ impl SqlQueryHandler for Instance { } } + async fn do_promql_query(&self, query: &str, _: QueryContextRef) -> Vec> { + if let Some(handler) = &self.promql_handler { + let result = handler + .do_query(query) + .await + .context(ExecutePromqlSnafu { query }); + vec![result] + } else { + vec![Err(NotSupportedSnafu { + feat: "PromQL Query", + } + .build())] + } + } + async fn do_statement_query( &self, stmt: Statement, @@ -496,6 +519,20 @@ impl ScriptHandler for Instance { } } +#[async_trait] +impl PromqlHandler for Instance { + async fn do_query(&self, query: &str) -> server_error::Result { + if let Some(promql_handler) = &self.promql_handler { + promql_handler.do_query(query).await + } else { + server_error::NotSupportedSnafu { + feat: "PromQL query in Frontend", + } + .fail() + } + } +} + #[cfg(test)] mod tests { use std::borrow::Cow; diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index bd9350b15b..c7d7f8c13b 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -395,6 +395,14 @@ impl SqlQueryHandler for DistInstance { self.handle_sql(query, query_ctx).await } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, stmt: Statement, diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index fdfa92b72b..6138727e94 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -47,6 +47,14 @@ impl SqlQueryHandler for StandaloneSqlQueryHandler { .collect() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, stmt: Statement, diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index d34f24c8a1..36fa94ec1f 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -28,6 +28,7 @@ pub mod mysql; pub mod opentsdb; pub mod postgres; pub mod prometheus; +pub mod promql; mod server; mod sql; mod table; diff --git a/src/frontend/src/promql.rs b/src/frontend/src/promql.rs new file mode 100644 index 0000000000..a2e18a4922 --- /dev/null +++ b/src/frontend/src/promql.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PromqlOptions { + pub addr: String, +} + +impl Default for PromqlOptions { + fn default() -> Self { + Self { + addr: "127.0.0.1:4004".to_string(), + } + } +} + +#[cfg(test)] +mod tests { + use super::PromqlOptions; + + #[test] + fn test_prometheus_options() { + let default = PromqlOptions::default(); + assert_eq!(default.addr, "127.0.0.1:4004".to_string()); + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index e438d0657b..daff47ed26 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -24,6 +24,7 @@ use servers::http::HttpServer; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; +use servers::promql::PromqlServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -154,7 +155,7 @@ impl Services { ServerSqlQueryHandlerAdaptor::arc(instance.clone()), http_options.clone(), ); - if let Some(user_provider) = user_provider { + if let Some(user_provider) = user_provider.clone() { http_server.set_user_provider(user_provider); } @@ -181,12 +182,26 @@ impl Services { None }; + let promql_server_and_addr = if let Some(promql_options) = &opts.promql_options { + let promql_addr = parse_addr(&promql_options.addr)?; + + let mut promql_server = PromqlServer::create_server(instance.clone()); + if let Some(user_provider) = user_provider { + promql_server.set_user_provider(user_provider); + } + + Some((promql_server as _, promql_addr)) + } else { + None + }; + try_join!( start_server(http_server_and_addr), start_server(grpc_server_and_addr), start_server(mysql_server_and_addr), start_server(postgres_server_and_addr), - start_server(opentsdb_server_and_addr) + start_server(opentsdb_server_and_addr), + start_server(promql_server_and_addr), ) .context(error::StartServerSnafu)?; Ok(()) diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 4eee7617ca..afb111629f 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -81,7 +81,7 @@ impl Server for GrpcServer { .await .context(TcpBindSnafu { addr })?; let addr = listener.local_addr().context(TcpBindSnafu { addr })?; - info!("GRPC server is bound to {}", addr); + info!("gRPC server is bound to {}", addr); *shutdown_tx = Some(tx); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index c33b2aedbb..9b0eb8b7c2 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -399,8 +399,8 @@ impl HttpServer { pub fn make_app(&self) -> Router { let mut api = OpenApi { info: Info { - title: "Greptime DB HTTP API".to_string(), - description: Some("HTTP APIs to interact with Greptime DB".to_string()), + title: "GreptimeDB HTTP API".to_string(), + description: Some("HTTP APIs to interact with GreptimeDB".to_string()), version: HTTP_API_VERSION.to_string(), ..Info::default() }, @@ -470,6 +470,11 @@ impl HttpServer { apirouting::get_with(handler::sql, handler::sql_docs) .post_with(handler::sql, handler::sql_docs), ) + .api_route( + "/promql", + apirouting::get_with(handler::promql, handler::sql_docs) + .post_with(handler::promql, handler::sql_docs), + ) .api_route("/scripts", apirouting::post(script::scripts)) .api_route("/run-script", apirouting::post(script::run_script)) .route("/private/api.json", apirouting::get(serve_api)) @@ -577,6 +582,14 @@ mod test { unimplemented!() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 0c7daac6b8..0598fe0393 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -59,6 +59,32 @@ pub async fn sql( Json(resp.with_execution_time(start.elapsed().as_millis())) } +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct PromqlQuery { + pub query: String, +} + +/// Handler to execute promql +#[axum_macros::debug_handler] +pub async fn promql( + State(state): State, + Query(params): Query, + // TODO(fys): pass _user_info into query context + _user_info: Extension, +) -> Json { + let sql_handler = &state.sql_handler; + let start = Instant::now(); + let resp = match super::query_context_from_db(sql_handler.clone(), None) { + Ok(query_ctx) => { + JsonResponse::from_output(sql_handler.do_promql_query(¶ms.query, query_ctx).await) + .await + } + Err(resp) => resp, + }; + + Json(resp.with_execution_time(start.elapsed().as_millis())) +} + pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation { op.response::<200, Json>() } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 6233ef1e5c..d759a57bb6 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] +#![feature(try_blocks)] use common_catalog::consts::DEFAULT_CATALOG_NAME; use serde::{Deserialize, Serialize}; @@ -28,11 +29,11 @@ pub mod mysql; pub mod opentsdb; pub mod postgres; pub mod prometheus; +pub mod promql; pub mod query_handler; pub mod server; -pub mod tls; - mod shutdown; +pub mod tls; #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] #[serde(rename_all = "lowercase")] diff --git a/src/servers/src/promql.rs b/src/servers/src/promql.rs new file mode 100644 index 0000000000..45902d5c18 --- /dev/null +++ b/src/servers/src/promql.rs @@ -0,0 +1,255 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use async_trait::async_trait; +use axum::body::BoxBody; +use axum::extract::{Query, State}; +use axum::{routing, Json, Router}; +use common_error::prelude::ErrorExt; +use common_query::Output; +use common_recordbatch::RecordBatches; +use common_telemetry::info; +use futures::FutureExt; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use tokio::sync::oneshot::Sender; +use tokio::sync::{oneshot, Mutex}; +use tower::ServiceBuilder; +use tower_http::auth::AsyncRequireAuthorizationLayer; +use tower_http::trace::TraceLayer; + +use crate::auth::UserProviderRef; +use crate::error::{AlreadyStartedSnafu, CollectRecordbatchSnafu, Result, StartHttpSnafu}; +use crate::http::authorize::HttpAuth; +use crate::server::Server; + +pub const PROMQL_API_VERSION: &str = "v1"; + +pub type PromqlHandlerRef = Arc; + +#[async_trait] +pub trait PromqlHandler { + async fn do_query(&self, query: &str) -> Result; +} + +pub struct PromqlServer { + query_handler: PromqlHandlerRef, + shutdown_tx: Mutex>>, + user_provider: Option, +} + +impl PromqlServer { + pub fn create_server(query_handler: PromqlHandlerRef) -> Box { + Box::new(PromqlServer { + query_handler, + shutdown_tx: Mutex::new(None), + user_provider: None, + }) + } + + pub fn set_user_provider(&mut self, user_provider: UserProviderRef) { + debug_assert!(self.user_provider.is_none()); + self.user_provider = Some(user_provider); + } + + pub fn make_app(&self) -> Router { + // TODO(ruihang): implement format_query, series, labels, values, query_examplars and targets methods + + let router = Router::new() + .route("/query", routing::post(instant_query).get(instant_query)) + .route("/range_query", routing::post(range_query).get(range_query)) + .with_state(self.query_handler.clone()); + + Router::new() + .nest(&format!("/{PROMQL_API_VERSION}"), router) + // middlewares + .layer( + ServiceBuilder::new() + .layer(TraceLayer::new_for_http()) + // custom layer + .layer(AsyncRequireAuthorizationLayer::new( + HttpAuth::::new(self.user_provider.clone()), + )), + ) + } +} + +#[async_trait] +impl Server for PromqlServer { + async fn shutdown(&self) -> Result<()> { + let mut shutdown_tx = self.shutdown_tx.lock().await; + if let Some(tx) = shutdown_tx.take() { + if tx.send(()).is_err() { + info!("Receiver dropped, the PromQl server has already existed"); + } + } + info!("Shutdown PromQL server"); + + Ok(()) + } + + async fn start(&self, listening: SocketAddr) -> Result { + let (tx, rx) = oneshot::channel(); + let server = { + let mut shutdown_tx = self.shutdown_tx.lock().await; + ensure!( + shutdown_tx.is_none(), + AlreadyStartedSnafu { server: "PromQL" } + ); + + let app = self.make_app(); + let server = axum::Server::bind(&listening).serve(app.into_make_service()); + + *shutdown_tx = Some(tx); + + server + }; + let listening = server.local_addr(); + info!("PromQL server is bound to {}", listening); + + let graceful = server.with_graceful_shutdown(rx.map(drop)); + graceful.await.context(StartHttpSnafu)?; + + Ok(listening) + } +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct PromqlSeries { + metric: HashMap, + values: Vec<(i64, String)>, +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct PromqlData { + #[serde(rename = "resultType")] + result_type: String, + result: Vec, +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct PromqlJsonResponse { + status: String, + data: PromqlData, + error: Option, + error_type: Option, + warnings: Option>, +} + +impl PromqlJsonResponse { + pub fn error(error_type: S1, reason: S2) -> Json + where + S1: Into, + S2: Into, + { + Json(PromqlJsonResponse { + status: "error".to_string(), + data: PromqlData::default(), + error: Some(reason.into()), + error_type: Some(error_type.into()), + warnings: None, + }) + } + + pub fn success(data: PromqlData) -> Json { + Json(PromqlJsonResponse { + status: "success".to_string(), + data, + error: None, + error_type: None, + warnings: None, + }) + } + + /// Convert from `Result` + pub async fn from_query_result(result: Result) -> Json { + let response: Result> = try { + let json = match result? { + Output::RecordBatches(batches) => { + Self::success(Self::record_batches_to_data(batches)?) + } + Output::Stream(stream) => { + let record_batches = RecordBatches::try_collect(stream) + .await + .context(CollectRecordbatchSnafu)?; + Self::success(Self::record_batches_to_data(record_batches)?) + } + Output::AffectedRows(_) => Self::error( + "unexpected result", + "expected data result, but got affected rows", + ), + }; + + json + }; + + response.unwrap_or_else(|err| Self::error(err.status_code().to_string(), err.to_string())) + } + + /// TODO(ruihang): implement this conversion method + fn record_batches_to_data(_: RecordBatches) -> Result { + let data = PromqlData { + result_type: "matrix".to_string(), + result: vec![PromqlSeries { + metric: vec![("__name__".to_string(), "foo".to_string())] + .into_iter() + .collect(), + values: vec![(1, "123.45".to_string())], + }], + }; + + Ok(data) + } +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct InstantQuery { + query: String, + time: Option, + timeout: Option, +} + +#[axum_macros::debug_handler] +pub async fn instant_query( + State(_handler): State, + Query(_params): Query, +) -> Json { + PromqlJsonResponse::error( + "not implemented", + "instant query api `/query` is not implemented. Use `/range_query` instead.", + ) +} + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] +pub struct RangeQuery { + query: String, + start: String, + end: String, + step: String, + timeout: Option, +} + +#[axum_macros::debug_handler] +pub async fn range_query( + State(handler): State, + Query(params): Query, +) -> Json { + let result = handler.do_query(¶ms.query).await; + PromqlJsonResponse::from_query_result(result).await +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index bc7dd1df82..2a1e59818e 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -12,6 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! All query handler traits for various request protocols, like SQL or GRPC. +//! Instance that wishes to support certain request protocol, just implement the corresponding +//! trait, the Server will handle codec for you. +//! +//! Note: +//! Query handlers are not confined to only handle read requests, they are expecting to handle +//! write requests too. So the "query" here not might seem ambiguity. However, "query" has been +//! used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the +//! word "query". + pub mod grpc; pub mod sql; @@ -27,16 +37,6 @@ use crate::influxdb::InfluxdbRequest; use crate::opentsdb::codec::DataPoint; use crate::prometheus::Metrics; -/// All query handler traits for various request protocols, like SQL or GRPC. -/// Instance that wishes to support certain request protocol, just implement the corresponding -/// trait, the Server will handle codec for you. -/// -/// Note: -/// Query handlers are not confined to only handle read requests, they are expecting to handle -/// write requests too. So the "query" here not might seem ambiguity. However, "query" has been -/// used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the -/// word "query". - pub type OpentsdbProtocolHandlerRef = Arc; pub type InfluxdbLineProtocolHandlerRef = Arc; pub type PrometheusProtocolHandlerRef = Arc; diff --git a/src/servers/src/query_handler/sql.rs b/src/servers/src/query_handler/sql.rs index 0b82ae134f..d394e84d64 100644 --- a/src/servers/src/query_handler/sql.rs +++ b/src/servers/src/query_handler/sql.rs @@ -35,6 +35,12 @@ pub trait SqlQueryHandler { query_ctx: QueryContextRef, ) -> Vec>; + async fn do_promql_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec>; + async fn do_statement_query( &self, stmt: Statement, @@ -75,6 +81,22 @@ where .collect() } + async fn do_promql_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec> { + self.0 + .do_promql_query(query, query_ctx) + .await + .into_iter() + .map(|x| { + x.map_err(BoxedError::new) + .context(error::ExecuteQuerySnafu { query }) + }) + .collect() + } + async fn do_statement_query( &self, stmt: Statement, diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index f12cfc7712..7425ec03c3 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -54,6 +54,14 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 32954def9d..70f8c3e070 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -52,6 +52,14 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 46df57753b..3415730558 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -77,6 +77,14 @@ impl SqlQueryHandler for DummyInstance { unimplemented!() } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 6e8e833640..15af6e9706 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -68,6 +68,14 @@ impl SqlQueryHandler for DummyInstance { vec![Ok(output)] } + async fn do_promql_query( + &self, + _: &str, + _: QueryContextRef, + ) -> Vec> { + unimplemented!() + } + async fn do_statement_query( &self, _stmt: sql::statements::statement::Statement, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 978495cec7..3232b84628 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -40,6 +40,7 @@ use once_cell::sync::OnceCell; use rand::Rng; use servers::grpc::GrpcServer; use servers::http::{HttpOptions, HttpServer}; +use servers::promql::PromqlServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; @@ -265,7 +266,7 @@ async fn build_frontend_instance(datanode_instance: InstanceRef) -> FeInstance { frontend_instance } -pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { +pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); instance.start().await.unwrap(); @@ -283,7 +284,7 @@ pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, Tes (http_server.make_app(), guard) } -pub async fn setup_test_app_with_frontend( +pub async fn setup_test_http_app_with_frontend( store_type: StorageType, name: &str, ) -> (Router, TestGuard) { @@ -307,6 +308,26 @@ pub async fn setup_test_app_with_frontend( (app, guard) } +pub async fn setup_test_promql_app_with_frontend( + store_type: StorageType, + name: &str, +) -> (Router, TestGuard) { + let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); + let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); + let frontend = build_frontend_instance(instance.clone()).await; + instance.start().await.unwrap(); + create_test_table( + frontend.catalog_manager(), + instance.sql_handler(), + ConcreteDataType::timestamp_millisecond_datatype(), + ) + .await + .unwrap(); + let promql_server = PromqlServer::create_server(Arc::new(frontend) as _); + let app = promql_server.make_app(); + (app, guard) +} + pub async fn setup_grpc_server( store_type: StorageType, name: &str, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7634f9b928..2daa73649d 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -18,7 +18,10 @@ use common_error::status_code::StatusCode as ErrorCode; use serde_json::json; use servers::http::handler::HealthResponse; use servers::http::{JsonOutput, JsonResponse}; -use tests_integration::test_util::{setup_test_app, setup_test_app_with_frontend, StorageType}; +use tests_integration::test_util::{ + setup_test_http_app, setup_test_http_app_with_frontend, setup_test_promql_app_with_frontend, + StorageType, +}; #[macro_export] macro_rules! http_test { @@ -50,6 +53,7 @@ macro_rules! http_tests { $service, test_sql_api, + test_promql_api, test_metrics_api, test_scripts_api, test_health_api, @@ -60,7 +64,7 @@ macro_rules! http_tests { pub async fn test_sql_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, mut guard) = setup_test_app_with_frontend(store_type, "sql_api").await; + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await; let client = TestClient::new(app); let res = client.get("/v1/sql").send().await; assert_eq!(res.status(), StatusCode::OK); @@ -261,10 +265,35 @@ pub async fn test_sql_api(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_promql_api(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_promql_app_with_frontend(store_type, "promql_api").await; + let client = TestClient::new(app); + + // instant query + let res = client.get("/v1/query?query=up").send().await; + assert_eq!(res.status(), StatusCode::OK); + let res = client.post("/v1/query?query=up").send().await; + assert_eq!(res.status(), StatusCode::OK); + + let res = client + .get("/v1/range_query?query=up&start=1&end=100&step=5") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let res = client + .post("/v1/range_query?query=up&start=1&end=100&step=5") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + guard.remove_all().await; +} + pub async fn test_metrics_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); common_telemetry::init_default_metrics_recorder(); - let (app, mut guard) = setup_test_app(store_type, "metrics_api").await; + let (app, mut guard) = setup_test_http_app(store_type, "metrics_api").await; let client = TestClient::new(app); // Send a sql @@ -284,7 +313,7 @@ pub async fn test_metrics_api(store_type: StorageType) { pub async fn test_scripts_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, mut guard) = setup_test_app_with_frontend(store_type, "script_api").await; + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "script_api").await; let client = TestClient::new(app); let res = client @@ -325,7 +354,7 @@ def test(n): pub async fn test_health_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, _guard) = setup_test_app_with_frontend(store_type, "health_api").await; + let (app, _guard) = setup_test_http_app_with_frontend(store_type, "health_api").await; let client = TestClient::new(app); // we can call health api with both `GET` and `POST` method.