feat: handle PromQL HTTP API parameters (#985)

* feat: impl EvalStmt parser

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix compile errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl From<PromqlQuery> for PromQuery

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move format into with_context

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* shorthand compound error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use rfc3339 error to report float parsing error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove CompoundError

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-02-15 17:15:44 +08:00
committed by GitHub
parent 5d1f231004
commit dfe7bfb07f
17 changed files with 270 additions and 46 deletions

1
Cargo.lock generated
View File

@@ -5469,6 +5469,7 @@ dependencies = [
"arc-swap",
"async-trait",
"catalog",
"chrono",
"common-base",
"common-catalog",
"common-error",

View File

@@ -21,7 +21,7 @@ use common_recordbatch::RecordBatches;
use common_telemetry::logging::info;
use common_telemetry::timer;
use datatypes::schema::Schema;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use servers::error as server_error;
use servers::promql::PromqlHandler;
use servers::query_handler::sql::SqlQueryHandler;
@@ -170,7 +170,11 @@ impl Instance {
self.execute_stmt(stmt, query_ctx).await
}
pub async fn execute_promql(&self, promql: &str, query_ctx: QueryContextRef) -> Result<Output> {
pub async fn execute_promql(
&self,
promql: &PromQuery,
query_ctx: QueryContextRef,
) -> Result<Output> {
let stmt = QueryLanguageParser::parse_promql(promql).context(ExecuteSqlSnafu)?;
self.execute_stmt(stmt, query_ctx).await
}
@@ -185,7 +189,13 @@ impl Instance {
lookback: Duration,
query_ctx: QueryContextRef,
) -> Result<Output> {
let mut stmt = QueryLanguageParser::parse_promql(promql).context(ExecuteSqlSnafu)?;
let query = PromQuery {
query: promql.to_string(),
start: "0".to_string(),
end: "0".to_string(),
step: "5m".to_string(),
};
let mut stmt = QueryLanguageParser::parse_promql(&query).context(ExecuteSqlSnafu)?;
match &mut stmt {
QueryStatement::Sql(_) => unreachable!(),
QueryStatement::Promql(eval_stmt) => {
@@ -243,7 +253,7 @@ impl SqlQueryHandler for Instance {
async fn do_promql_query(
&self,
query: &str,
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
@@ -282,12 +292,18 @@ impl SqlQueryHandler for Instance {
#[async_trait]
impl PromqlHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
self.execute_promql(query, QueryContext::arc())
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu { query })
.with_context(|_| {
let query_literal = format!("{query:?}");
server_error::ExecuteQuerySnafu {
query: query_literal,
}
})
}
}

View File

@@ -44,6 +44,7 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use partition::manager::PartitionRuleManager;
use partition::route::TableRoutes;
use query::parser::PromQuery;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use servers::error as server_error;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
@@ -448,12 +449,14 @@ impl SqlQueryHandler for Instance {
}
}
async fn do_promql_query(&self, query: &str, _: QueryContextRef) -> Vec<Result<Output>> {
async fn do_promql_query(&self, query: &PromQuery, _: QueryContextRef) -> Vec<Result<Output>> {
if let Some(handler) = &self.promql_handler {
let result = handler
.do_query(query)
.await
.context(ExecutePromqlSnafu { query });
let result = handler.do_query(query).await.with_context(|_| {
let query_literal = format!("{query:?}");
ExecutePromqlSnafu {
query: query_literal,
}
});
vec![result]
} else {
vec![Err(NotSupportedSnafu {
@@ -523,7 +526,7 @@ impl ScriptHandler for Instance {
#[async_trait]
impl PromqlHandler for Instance {
async fn do_query(&self, query: &str) -> server_error::Result<Output> {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {
if let Some(promql_handler) = &self.promql_handler {
promql_handler.do_query(query).await
} else {

View File

@@ -42,7 +42,7 @@ use meta_client::rpc::{
RouteResponse, TableName,
};
use partition::partition::{PartitionBound, PartitionDef};
use query::parser::QueryStatement;
use query::parser::{PromQuery, QueryStatement};
use query::sql::{describe_table, explain, show_databases, show_tables};
use query::{QueryEngineFactory, QueryEngineRef};
use servers::query_handler::sql::SqlQueryHandler;
@@ -508,7 +508,7 @@ impl SqlQueryHandler for DistInstance {
async fn do_promql_query(
&self,
_: &str,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()

View File

@@ -19,6 +19,7 @@ use async_trait::async_trait;
use common_query::Output;
use datanode::error::Error as DatanodeError;
use datatypes::schema::Schema;
use query::parser::PromQuery;
use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef};
use session::context::QueryContextRef;
@@ -50,7 +51,7 @@ impl SqlQueryHandler for StandaloneSqlQueryHandler {
async fn do_promql_query(
&self,
_: &str,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()

View File

@@ -8,6 +8,7 @@ license.workspace = true
arc-swap = "1.0"
async-trait = "0.1"
catalog = { path = "../catalog" }
chrono.workspace = true
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }

View File

@@ -74,6 +74,20 @@ pub enum Error {
#[snafu(display("Failed to convert datatype: {}", source))]
Datatype { source: datatypes::error::Error },
#[snafu(display("Failed to parse timestamp `{}`: {}", raw, source))]
ParseTimestamp {
raw: String,
source: chrono::ParseError,
backtrace: Backtrace,
},
#[snafu(display("Failed to parse float number `{}`: {}", raw, source))]
ParseFloat {
raw: String,
source: std::num::ParseFloatError,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
@@ -85,7 +99,9 @@ impl ErrorExt for Error {
UnsupportedExpr { .. }
| CatalogNotFound { .. }
| SchemaNotFound { .. }
| TableNotFound { .. } => StatusCode::InvalidArguments,
| TableNotFound { .. }
| ParseTimestamp { .. }
| ParseFloat { .. } => StatusCode::InvalidArguments,
QueryAccessDenied { .. } => StatusCode::AccessDenied,
Catalog { source } => source.status_code(),
VectorComputation { source } => source.status_code(),

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use std::time::{Duration, SystemTime};
use chrono::DateTime;
use common_error::ext::PlainError;
use common_error::prelude::BoxedError;
use common_error::status_code::StatusCode;
@@ -24,15 +25,27 @@ use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
use crate::error::{MultipleStatementsSnafu, QueryParseSnafu, Result};
use crate::error::{
MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result,
};
use crate::metric::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED};
const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m
#[derive(Debug, Clone)]
pub enum QueryStatement {
Sql(Statement),
Promql(EvalStmt),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PromQuery {
pub query: String,
pub start: String,
pub end: String,
pub step: String,
}
pub struct QueryLanguageParser {}
impl QueryLanguageParser {
@@ -54,25 +67,75 @@ impl QueryLanguageParser {
}
// TODO(ruihang): implement this method when parser is ready.
pub fn parse_promql(promql: &str) -> Result<QueryStatement> {
pub fn parse_promql(query: &PromQuery) -> Result<QueryStatement> {
let _timer = timer!(METRIC_PARSE_PROMQL_ELAPSED);
let prom_expr = promql_parser::parser::parse(promql)
let expr = promql_parser::parser::parse(&query.query)
.map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
.context(QueryParseSnafu { query: promql })?;
.context(QueryParseSnafu {
query: &query.query,
})?;
let start = Self::parse_promql_timestamp(&query.start)
.map_err(BoxedError::new)
.context(QueryParseSnafu {
query: &query.query,
})?;
let end = Self::parse_promql_timestamp(&query.end)
.map_err(BoxedError::new)
.context(QueryParseSnafu {
query: &query.query,
})?;
let step = promql_parser::util::parse_duration(&query.step)
.map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
.context(QueryParseSnafu {
query: &query.query,
})?;
let eval_stmt = EvalStmt {
expr: prom_expr,
start: std::time::UNIX_EPOCH,
end: std::time::UNIX_EPOCH
.checked_add(Duration::from_secs(100))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
expr,
start,
end,
interval: step,
// TODO(ruihang): provide a way to adjust this parameter.
lookback_delta: Duration::from_secs(DEFAULT_LOOKBACK),
};
Ok(QueryStatement::Promql(eval_stmt))
}
fn parse_promql_timestamp(timestamp: &str) -> Result<SystemTime> {
// try rfc3339 format
let rfc3339_result = DateTime::parse_from_rfc3339(timestamp)
.context(ParseTimestampSnafu { raw: timestamp })
.map(Into::<SystemTime>::into);
// shorthand
if rfc3339_result.is_ok() {
return rfc3339_result;
}
// try float format
timestamp
.parse::<f64>()
.context(ParseFloatSnafu { raw: timestamp })
.map(|float| {
let duration = Duration::from_secs_f64(float);
SystemTime::UNIX_EPOCH
.checked_add(duration)
.unwrap_or(max_system_timestamp())
})
// also report rfc3339 error if float parsing fails
.map_err(|_| rfc3339_result.unwrap_err())
}
}
fn max_system_timestamp() -> SystemTime {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs(std::i64::MAX as u64))
.unwrap()
}
#[cfg(test)]
@@ -111,4 +174,78 @@ mod test {
assert_eq!(format!("{stmt:?}"), expected);
}
#[test]
fn parse_promql_timestamp() {
let cases = vec![
(
"1435781451.781",
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs_f64(1435781451.781))
.unwrap(),
),
("0.000", SystemTime::UNIX_EPOCH),
("00", SystemTime::UNIX_EPOCH),
(
// i64::MAX + 1
"9223372036854775808.000",
max_system_timestamp(),
),
(
"2015-07-01T20:10:51.781Z",
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs_f64(1435781451.781))
.unwrap(),
),
("1970-01-01T00:00:00.000Z", SystemTime::UNIX_EPOCH),
];
for (input, expected) in cases {
let result = QueryLanguageParser::parse_promql_timestamp(input).unwrap();
let result = result
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let expected = expected
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
// assert difference < 0.1 second
assert!(result.abs_diff(expected) < 100);
}
}
#[test]
fn parse_promql_simple() {
let promql = PromQuery {
query: "http_request".to_string(),
start: "2022-02-13T17:14:00Z".to_string(),
end: "2023-02-13T17:14:00Z".to_string(),
step: "1d".to_string(),
};
let expected = String::from(
"\
Promql(EvalStmt { \
expr: VectorSelector(VectorSelector { \
name: Some(\"http_request\"), \
matchers: Matchers { \
matchers: {Matcher { \
op: Equal, \
name: \"__name__\", \
value: \"http_request\" \
}} }, \
offset: None, at: None }), \
start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \
interval: 86400s, \
lookback_delta: 300s \
})",
);
let result = QueryLanguageParser::parse_promql(&promql).unwrap();
assert_eq!(format!("{result:?}"), expected);
}
}

View File

@@ -563,6 +563,7 @@ mod test {
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVector, UInt32Vector};
use query::parser::PromQuery;
use session::context::QueryContextRef;
use tokio::sync::mpsc;
@@ -584,7 +585,7 @@ mod test {
async fn do_promql_query(
&self,
_: &str,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()

View File

@@ -20,6 +20,7 @@ use axum::extract::{Json, Query, State};
use axum::{Extension, Form};
use common_error::status_code::StatusCode;
use common_telemetry::metric;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session::context::UserInfo;
@@ -72,6 +73,17 @@ pub struct PromqlQuery {
pub db: Option<String>,
}
impl From<PromqlQuery> for PromQuery {
fn from(query: PromqlQuery) -> Self {
PromQuery {
query: query.query,
start: query.start,
end: query.end,
step: query.step,
}
}
}
/// Handler to execute promql
#[axum_macros::debug_handler]
pub async fn promql(
@@ -81,16 +93,18 @@ pub async fn promql(
_user_info: Extension<UserInfo>,
) -> Json<JsonResponse> {
let sql_handler = &state.sql_handler;
let start = Instant::now();
let resp = match super::query_context_from_db(sql_handler.clone(), params.db) {
let exec_start = Instant::now();
let db = params.db.clone();
let prom_query = params.into();
let resp = match super::query_context_from_db(sql_handler.clone(), db) {
Ok(query_ctx) => {
JsonResponse::from_output(sql_handler.do_promql_query(&params.query, query_ctx).await)
JsonResponse::from_output(sql_handler.do_promql_query(&prom_query, query_ctx).await)
.await
}
Err(resp) => resp,
};
Json(resp.with_execution_time(start.elapsed().as_millis()))
Json(resp.with_execution_time(exec_start.elapsed().as_millis()))
}
pub(crate) fn sql_docs(op: TransformOperation) -> TransformOperation {

View File

@@ -25,6 +25,7 @@ use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
use futures::FutureExt;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -45,7 +46,7 @@ pub type PromqlHandlerRef = Arc<dyn PromqlHandler + Send + Sync>;
#[async_trait]
pub trait PromqlHandler {
async fn do_query(&self, query: &str) -> Result<Output>;
async fn do_query(&self, query: &PromQuery) -> Result<Output>;
}
pub struct PromqlServer {
@@ -250,6 +251,12 @@ pub async fn range_query(
State(handler): State<PromqlHandlerRef>,
Query(params): Query<RangeQuery>,
) -> Json<PromqlJsonResponse> {
let result = handler.do_query(&params.query).await;
let prom_query = PromQuery {
query: params.query,
start: params.start,
end: params.end,
step: params.step,
};
let result = handler.do_query(&prom_query).await;
PromqlJsonResponse::from_query_result(result).await
}

View File

@@ -18,6 +18,7 @@ use async_trait::async_trait;
use common_error::prelude::*;
use common_query::Output;
use datatypes::schema::Schema;
use query::parser::PromQuery;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
@@ -38,7 +39,7 @@ pub trait SqlQueryHandler {
async fn do_promql_query(
&self,
query: &str,
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>>;
@@ -91,7 +92,7 @@ where
async fn do_promql_query(
&self,
query: &str,
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
self.0
@@ -99,8 +100,12 @@ where
.await
.into_iter()
.map(|x| {
x.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query })
x.map_err(BoxedError::new).with_context(|_| {
let query_literal = format!("{query:?}");
error::ExecuteQuerySnafu {
query: query_literal,
}
})
})
.collect()
}

View File

@@ -20,6 +20,7 @@ use axum::{http, Router};
use axum_test_helper::TestClient;
use common_query::Output;
use datatypes::schema::Schema;
use query::parser::PromQuery;
use servers::error::{Error, Result};
use servers::http::{HttpOptions, HttpServer};
use servers::influxdb::InfluxdbRequest;
@@ -57,7 +58,7 @@ impl SqlQueryHandler for DummyInstance {
async fn do_promql_query(
&self,
_: &str,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()

View File

@@ -19,6 +19,7 @@ use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use datatypes::schema::Schema;
use query::parser::PromQuery;
use servers::error::{self, Result};
use servers::http::{HttpOptions, HttpServer};
use servers::opentsdb::codec::DataPoint;
@@ -55,7 +56,7 @@ impl SqlQueryHandler for DummyInstance {
async fn do_promql_query(
&self,
_: &str,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()

View File

@@ -23,6 +23,7 @@ use axum_test_helper::TestClient;
use common_query::Output;
use datatypes::schema::Schema;
use prost::Message;
use query::parser::PromQuery;
use servers::error::{Error, Result};
use servers::http::{HttpOptions, HttpServer};
use servers::prometheus;
@@ -80,7 +81,7 @@ impl SqlQueryHandler for DummyInstance {
async fn do_promql_query(
&self,
_: &str,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()

View File

@@ -21,7 +21,7 @@ use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use datatypes::schema::Schema;
use query::parser::{QueryLanguageParser, QueryStatement};
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::{QueryEngineFactory, QueryEngineRef};
use script::engine::{CompileContext, EvalContext, Script, ScriptEngine};
use script::python::{PyEngine, PyScript};
@@ -72,7 +72,7 @@ impl SqlQueryHandler for DummyInstance {
async fn do_promql_query(
&self,
_: &str,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
unimplemented!()

View File

@@ -53,7 +53,8 @@ macro_rules! http_tests {
$service,
test_sql_api,
test_promql_api,
test_prometheus_promql_api,
test_promql_http_api,
test_metrics_api,
test_scripts_api,
test_health_api,
@@ -265,7 +266,25 @@ pub async fn test_sql_api(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_promql_api(store_type: StorageType) {
pub async fn test_prometheus_promql_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await;
let client = TestClient::new(app);
let res = client
.get("/v1/promql?query=abs(demo{host=\"Hangzhou\"})&start=0&end=100&step=5s")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<JsonResponse>(&res.text().await).unwrap();
assert!(body.success());
assert!(body.execution_time_ms().is_some());
guard.remove_all().await;
}
pub async fn test_promql_http_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_promql_app_with_frontend(store_type, "promql_api").await;
let client = TestClient::new(app);