mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 08:20:36 +00:00
feat(promql): parameterize lookback (#3630)
* feat(promql): parameterize lookback * chore(promql): address CR, adjusted sqlness * chore(promql): fmt * chore(promql): fix accidental removal * fix(promql): address CR * fix(promql): address CR * feat(promql): add initial lookback parameter grpc support * fix: update greptime-proto revision * chore: restore accidental removal
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3775,7 +3775,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=8da84a04b137c4104262459807eab1c04b92f3cc#8da84a04b137c4104262459807eab1c04b92f3cc"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=04d78b6e025ceb518040fdd10858c2a9d9345820#04d78b6e025ceb518040fdd10858c2a9d9345820"
|
||||
dependencies = [
|
||||
"prost 0.12.3",
|
||||
"serde",
|
||||
|
||||
@@ -104,7 +104,7 @@ etcd-client = "0.12"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8da84a04b137c4104262459807eab1c04b92f3cc" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "04d78b6e025ceb518040fdd10858c2a9d9345820" }
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
|
||||
@@ -37,6 +37,8 @@ use snafu::{ensure, ResultExt};
|
||||
use crate::error::{ConvertFlightDataSnafu, Error, IllegalFlightMessagesSnafu, ServerSnafu};
|
||||
use crate::{error, from_grpc_response, metrics, Client, Result, StreamInserter};
|
||||
|
||||
pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct Database {
|
||||
// The "catalog" and "schema" to be used in processing the requests at the server side.
|
||||
@@ -215,6 +217,7 @@ impl Database {
|
||||
start: start.to_string(),
|
||||
end: end.to_string(),
|
||||
step: step.to_string(),
|
||||
lookback: DEFAULT_LOOKBACK_STRING.to_string(),
|
||||
})),
|
||||
}))
|
||||
.await
|
||||
|
||||
@@ -85,6 +85,7 @@ impl GrpcQueryHandler for Instance {
|
||||
start: promql.start,
|
||||
end: promql.end,
|
||||
step: promql.step,
|
||||
lookback: promql.lookback,
|
||||
};
|
||||
let mut result =
|
||||
SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
|
||||
|
||||
@@ -18,7 +18,7 @@ use common_query::Output;
|
||||
use common_telemetry::tracing;
|
||||
use query::parser::{
|
||||
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
|
||||
EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
|
||||
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
|
||||
};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
@@ -37,12 +37,16 @@ impl StatementExecutor {
|
||||
end: eval.end,
|
||||
step: eval.step,
|
||||
query: eval.query,
|
||||
lookback: eval.lookback.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
|
||||
};
|
||||
QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)?
|
||||
}
|
||||
Tql::Explain(explain) => {
|
||||
let promql = PromQuery {
|
||||
query: explain.query,
|
||||
lookback: explain
|
||||
.lookback
|
||||
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
|
||||
..PromQuery::default()
|
||||
};
|
||||
let explain_node_name = if explain.is_verbose {
|
||||
@@ -63,6 +67,9 @@ impl StatementExecutor {
|
||||
end: analyze.end,
|
||||
step: analyze.step,
|
||||
query: analyze.query,
|
||||
lookback: analyze
|
||||
.lookback
|
||||
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
|
||||
};
|
||||
let analyze_node_name = if analyze.is_verbose {
|
||||
ANALYZE_VERBOSE_NODE_NAME
|
||||
|
||||
@@ -36,7 +36,6 @@ use crate::error::{
|
||||
};
|
||||
use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};
|
||||
|
||||
const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m
|
||||
pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
|
||||
pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
|
||||
pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
|
||||
@@ -98,6 +97,7 @@ pub struct PromQuery {
|
||||
pub start: String,
|
||||
pub end: String,
|
||||
pub step: String,
|
||||
pub lookback: String,
|
||||
}
|
||||
|
||||
impl Default for PromQuery {
|
||||
@@ -107,6 +107,7 @@ impl Default for PromQuery {
|
||||
start: String::from("0"),
|
||||
end: String::from("0"),
|
||||
step: String::from("5m"),
|
||||
lookback: String::from(DEFAULT_LOOKBACK_STRING),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -165,13 +166,22 @@ impl QueryLanguageParser {
|
||||
query: &query.query,
|
||||
})?;
|
||||
|
||||
let lookback_delta = query
|
||||
.lookback
|
||||
.parse::<u64>()
|
||||
.map(Duration::from_secs)
|
||||
.or_else(|_| promql_parser::util::parse_duration(&query.lookback))
|
||||
.map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
|
||||
.context(QueryParseSnafu {
|
||||
query: &query.query,
|
||||
})?;
|
||||
|
||||
let eval_stmt = EvalStmt {
|
||||
expr,
|
||||
start,
|
||||
end,
|
||||
interval: step,
|
||||
// TODO(ruihang): provide a way to adjust this parameter.
|
||||
lookback_delta: Duration::from_secs(DEFAULT_LOOKBACK),
|
||||
lookback_delta,
|
||||
};
|
||||
|
||||
Ok(QueryStatement::Promql(eval_stmt))
|
||||
@@ -353,6 +363,7 @@ mod test {
|
||||
start: "2022-02-13T17:14:00Z".to_string(),
|
||||
end: "2023-02-13T17:14:00Z".to_string(),
|
||||
step: "1d".to_string(),
|
||||
lookback: "5m".to_string(),
|
||||
};
|
||||
|
||||
#[cfg(not(windows))]
|
||||
|
||||
@@ -347,7 +347,7 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Failed to parse PromQL: {query:?}"))]
|
||||
ParsePromQL {
|
||||
query: PromQuery,
|
||||
query: Box<PromQuery>,
|
||||
location: Location,
|
||||
source: query::error::Error,
|
||||
},
|
||||
|
||||
@@ -58,6 +58,7 @@ impl PrometheusGateway for PrometheusGatewayService {
|
||||
start: range_query.start,
|
||||
end: range_query.end,
|
||||
step: range_query.step,
|
||||
lookback: range_query.lookback,
|
||||
}
|
||||
}
|
||||
Promql::InstantQuery(instant_query) => {
|
||||
@@ -71,6 +72,7 @@ impl PrometheusGateway for PrometheusGatewayService {
|
||||
start: time.clone(),
|
||||
end: time,
|
||||
step: String::from("1s"),
|
||||
lookback: instant_query.lookback,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -26,7 +26,7 @@ use common_plugins::GREPTIME_EXEC_WRITE_COST;
|
||||
use common_query::{Output, OutputData};
|
||||
use common_recordbatch::util;
|
||||
use common_telemetry::tracing;
|
||||
use query::parser::PromQuery;
|
||||
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
@@ -205,6 +205,7 @@ pub struct PromqlQuery {
|
||||
pub start: String,
|
||||
pub end: String,
|
||||
pub step: String,
|
||||
pub lookback: Option<String>,
|
||||
pub db: Option<String>,
|
||||
}
|
||||
|
||||
@@ -215,6 +216,9 @@ impl From<PromqlQuery> for PromQuery {
|
||||
start: query.start,
|
||||
end: query.end,
|
||||
step: query.step,
|
||||
lookback: query
|
||||
.lookback
|
||||
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -152,6 +152,7 @@ pub async fn build_info_query() -> PrometheusJsonResponse {
|
||||
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct InstantQuery {
|
||||
query: Option<String>,
|
||||
lookback: Option<String>,
|
||||
time: Option<String>,
|
||||
timeout: Option<String>,
|
||||
db: Option<String>,
|
||||
@@ -178,6 +179,10 @@ pub async fn instant_query(
|
||||
start: time.clone(),
|
||||
end: time,
|
||||
step: "1s".to_string(),
|
||||
lookback: params
|
||||
.lookback
|
||||
.or(form_params.lookback)
|
||||
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
|
||||
};
|
||||
|
||||
let result = handler.do_query(&prom_query, query_ctx).await;
|
||||
@@ -196,6 +201,7 @@ pub struct RangeQuery {
|
||||
start: Option<String>,
|
||||
end: Option<String>,
|
||||
step: Option<String>,
|
||||
lookback: Option<String>,
|
||||
timeout: Option<String>,
|
||||
db: Option<String>,
|
||||
}
|
||||
@@ -216,6 +222,10 @@ pub async fn range_query(
|
||||
start: params.start.or(form_params.start).unwrap_or_default(),
|
||||
end: params.end.or(form_params.end).unwrap_or_default(),
|
||||
step: params.step.or(form_params.step).unwrap_or_default(),
|
||||
lookback: params
|
||||
.lookback
|
||||
.or(form_params.lookback)
|
||||
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
|
||||
};
|
||||
|
||||
let result = handler.do_query(&prom_query, query_ctx).await;
|
||||
@@ -235,6 +245,7 @@ struct Matches(Vec<String>);
|
||||
pub struct LabelsQuery {
|
||||
start: Option<String>,
|
||||
end: Option<String>,
|
||||
lookback: Option<String>,
|
||||
#[serde(flatten)]
|
||||
matches: Matches,
|
||||
db: Option<String>,
|
||||
@@ -310,6 +321,11 @@ pub async fn labels_query(
|
||||
.or(form_params.end)
|
||||
.unwrap_or_else(current_time_rfc3339);
|
||||
|
||||
let lookback = params
|
||||
.lookback
|
||||
.or(form_params.lookback)
|
||||
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());
|
||||
|
||||
let mut labels = HashSet::new();
|
||||
let _ = labels.insert(METRIC_NAME.to_string());
|
||||
|
||||
@@ -320,6 +336,7 @@ pub async fn labels_query(
|
||||
start: start.clone(),
|
||||
end: end.clone(),
|
||||
step: DEFAULT_LOOKBACK_STRING.to_string(),
|
||||
lookback: lookback.clone(),
|
||||
};
|
||||
|
||||
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
|
||||
@@ -546,6 +563,7 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
|
||||
pub struct LabelValueQuery {
|
||||
start: Option<String>,
|
||||
end: Option<String>,
|
||||
lookback: Option<String>,
|
||||
#[serde(flatten)]
|
||||
matches: Matches,
|
||||
db: Option<String>,
|
||||
@@ -587,6 +605,9 @@ pub async fn label_values_query(
|
||||
|
||||
let start = params.start.unwrap_or_else(yesterday_rfc3339);
|
||||
let end = params.end.unwrap_or_else(current_time_rfc3339);
|
||||
let lookback = params
|
||||
.lookback
|
||||
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());
|
||||
|
||||
let mut label_values = HashSet::new();
|
||||
|
||||
@@ -597,6 +618,7 @@ pub async fn label_values_query(
|
||||
start: start.clone(),
|
||||
end: end.clone(),
|
||||
step: DEFAULT_LOOKBACK_STRING.to_string(),
|
||||
lookback: lookback.clone(),
|
||||
};
|
||||
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
|
||||
if let Err(err) =
|
||||
@@ -695,6 +717,7 @@ async fn retrieve_label_values_from_record_batch(
|
||||
pub struct SeriesQuery {
|
||||
start: Option<String>,
|
||||
end: Option<String>,
|
||||
lookback: Option<String>,
|
||||
#[serde(flatten)]
|
||||
matches: Matches,
|
||||
db: Option<String>,
|
||||
@@ -726,6 +749,10 @@ pub async fn series_query(
|
||||
.end
|
||||
.or(form_params.end)
|
||||
.unwrap_or_else(current_time_rfc3339);
|
||||
let lookback = params
|
||||
.lookback
|
||||
.or(form_params.lookback)
|
||||
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());
|
||||
|
||||
let mut series = Vec::new();
|
||||
let mut merge_map = HashMap::new();
|
||||
@@ -737,6 +764,7 @@ pub async fn series_query(
|
||||
end: end.clone(),
|
||||
// TODO: find a better value for step
|
||||
step: DEFAULT_LOOKBACK_STRING.to_string(),
|
||||
lookback: lookback.clone(),
|
||||
};
|
||||
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
|
||||
|
||||
|
||||
@@ -193,6 +193,7 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
start: promql.start,
|
||||
end: promql.end,
|
||||
step: promql.step,
|
||||
lookback: promql.lookback,
|
||||
};
|
||||
let mut result =
|
||||
SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await;
|
||||
|
||||
@@ -790,6 +790,7 @@ CREATE TABLE {table_name} (
|
||||
start: "1672557973".to_owned(),
|
||||
end: "1672557978".to_owned(),
|
||||
step: "1s".to_owned(),
|
||||
lookback: "5m".to_string(),
|
||||
})),
|
||||
});
|
||||
let output = query(instance, request).await;
|
||||
|
||||
@@ -504,6 +504,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
|
||||
let instant_query = PromInstantQuery {
|
||||
query: "test".to_string(),
|
||||
time: "5".to_string(),
|
||||
lookback: "5m".to_string(),
|
||||
};
|
||||
let instant_query_request = PromqlRequest {
|
||||
header: Some(header.clone()),
|
||||
@@ -555,6 +556,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
|
||||
start: "0".to_string(),
|
||||
end: "10".to_string(),
|
||||
step: "5s".to_string(),
|
||||
lookback: "5m".to_string(),
|
||||
};
|
||||
let range_query_request: PromqlRequest = PromqlRequest {
|
||||
header: Some(header.clone()),
|
||||
@@ -605,6 +607,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
|
||||
start: "1000000000".to_string(),
|
||||
end: "1000001000".to_string(),
|
||||
step: "5s".to_string(),
|
||||
lookback: "5m".to_string(),
|
||||
};
|
||||
let range_query_request: PromqlRequest = PromqlRequest {
|
||||
header: Some(header),
|
||||
|
||||
@@ -39,7 +39,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|
||||
+-+-+
|
||||
| plan_type_| plan_|
|
||||
+-+-+
|
||||
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[1000], time index=[j], REDACTED
|
||||
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j], REDACTED
|
||||
|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|
||||
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|
||||
|
||||
@@ -34,21 +34,21 @@ TQL EXPLAIN (0, 10, '5s') test;
|
||||
-- SQLNESS REPLACE (peers.*) REDACTED
|
||||
TQL EXPLAIN (0, 10, '1s', '2s') test;
|
||||
|
||||
+---------------+-----------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+-----------------------------------------------------------------------------------------------+
|
||||
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
|
||||
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
|
||||
| | PromSeriesDivide: tags=["k"] |
|
||||
| | MergeScan [is_placeholder=false] |
|
||||
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
|
||||
+---------------+---------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+---------------------------------------------------------------------------------------------+
|
||||
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
|
||||
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
|
||||
| | PromSeriesDivide: tags=["k"] |
|
||||
| | MergeScan [is_placeholder=false] |
|
||||
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
|
||||
| | RepartitionExec: partitioning=REDACTED
|
||||
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
|
||||
| | PromSeriesDivideExec: tags=["k"] |
|
||||
| | SortExec: expr=[k@2 ASC NULLS LAST] |
|
||||
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
|
||||
| | PromSeriesDivideExec: tags=["k"] |
|
||||
| | SortExec: expr=[k@2 ASC NULLS LAST] |
|
||||
| | MergeScanExec: REDACTED
|
||||
| | |
|
||||
+---------------+-----------------------------------------------------------------------------------------------+
|
||||
| | |
|
||||
+---------------+---------------------------------------------------------------------------------------------+
|
||||
|
||||
-- explain at 0s, 5s and 10s. No point at 0s.
|
||||
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
|
||||
|
||||
@@ -76,7 +76,6 @@ TQL EVAL (0, 10, '5s') test{k="a"};
|
||||
| 2.0 | 1970-01-01T00:00:10 | a |
|
||||
+-----+---------------------+---+
|
||||
|
||||
-- 'lookback' parameter is not fully supported, the test has to be updated
|
||||
TQL EVAL (0, 10, '1s', '2s') test{k="a"};
|
||||
|
||||
+-----+---------------------+---+
|
||||
@@ -84,14 +83,6 @@ TQL EVAL (0, 10, '1s', '2s') test{k="a"};
|
||||
+-----+---------------------+---+
|
||||
| 2.0 | 1970-01-01T00:00:01 | a |
|
||||
| 2.0 | 1970-01-01T00:00:02 | a |
|
||||
| 2.0 | 1970-01-01T00:00:03 | a |
|
||||
| 2.0 | 1970-01-01T00:00:04 | a |
|
||||
| 2.0 | 1970-01-01T00:00:05 | a |
|
||||
| 2.0 | 1970-01-01T00:00:06 | a |
|
||||
| 2.0 | 1970-01-01T00:00:07 | a |
|
||||
| 2.0 | 1970-01-01T00:00:08 | a |
|
||||
| 2.0 | 1970-01-01T00:00:09 | a |
|
||||
| 2.0 | 1970-01-01T00:00:10 | a |
|
||||
+-----+---------------------+---+
|
||||
|
||||
TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"};
|
||||
|
||||
@@ -25,7 +25,6 @@ TQL EVAL (0, 10, '5s') {__name__!="test"};
|
||||
-- the point at 1ms will be shadowed by the point at 2ms
|
||||
TQL EVAL (0, 10, '5s') test{k="a"};
|
||||
|
||||
-- 'lookback' parameter is not fully supported, the test has to be updated
|
||||
TQL EVAL (0, 10, '1s', '2s') test{k="a"};
|
||||
|
||||
TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"};
|
||||
|
||||
Reference in New Issue
Block a user